LoginSignup
42
36

More than 5 years have passed since last update.

ズンドコキヨシ with RxSwift 最終版と試行錯誤の歴史

Last updated at Posted at 2016-03-29

ズンドコキヨシ with RxSwift 最終版

前作「ズンドコキヨシ with RxSwift」は、とりあえずサクっとRxSwiftで書いてみたバージョンだったので、ちゃんと動いてはいるもののいくつか不満がありました。

  1. 出力がdoOnNextsubscribeCompletedに分かれている
  2. 終了判定のために本流のストリームを変形している
  3. リズム感アップのためにキ・ヨ・シ!前にタイミング調整をしている
  4. scanを使っているところにRxっぽさが欠けるw

上記を解消しようと色々試行錯誤した結果、できたのが以下です。自分の中ではこれを最終版としました。

import Foundation
import RxSwift

enum ズンドコ: UInt32 {
    case ズン
    case ドコ
}

let ズンドコ川 = Observable<Int>.interval(0.5, scheduler: MainScheduler.instance)
    .map { _ in ズンドコ(rawValue: arc4random_uniform(2))! }
    .shareReplay(1)

let キヨシトリガー = [ズンドコ川, ズンドコ川.skip(1), ズンドコ川.skip(2), ズンドコ川.skip(3), ズンドコ川.skip(4)]
    .zip { $0 }
    .skipWhile { $0 != [.ズン, .ズン, .ズン, .ズン, .ドコ] }
    .skip(1)

_ = ズンドコ川
    .takeUntil(キヨシトリガー)
    .subscribe(onNext: { print($0) }, onCompleted: { print("キ・ヨ・シ!") })

このプログラムの説明も兼ねて、ここに至るまでの試行錯誤を書いてみたいと思います。興味のある方は続きをご覧ください。

出力を一箇所にまとめてみる

元々、プログラムは以下で終わりにしたかったんです。

    ...
    .subscribe(onNext: { print($0) }, onCompleted: { print("キ・ヨ・シ!") })

ですが、最初に書いたバージョンでは、途中経過の出力だけをdoOnNextでやっています。

_ = ズンドコ川
    .doOnNext { print($0) }
    .scan([]) { Array(($0 + [$1]).suffix(5)) }
    .takeWhile { $0 != [.ズン, .ズン, .ズン, .ズン, .ドコ] }
    .concat(Observable.never().take(0.5, scheduler: MainScheduler.instance))
    .subscribeCompleted { print("キ・ヨ・シ!") }

これは、判定のためにストリームを変形してしまっているので変形前に出力したかったのと、takeWhileの仕様によりこうなっています。

試しにdoOnNextをやめて、ストリームの終了判定後に元の形に戻してみます。

_ = ズンドコ川
    .scan([]) { Array(($0 + [$1]).suffix(5)) }
    .takeWhile { $0 != [.ズン, .ズン, .ズン, .ズン, .ドコ] as [ズンドコ] }
    .map { $0.last! }
    .concat(Observable.never().take(0.5, scheduler: MainScheduler.instance))
    .subscribe(onNext: { print($0) }, onCompleted: { print("キ・ヨ・シ!") })

一見うまくいきそうですが、これだとキ・ヨ・シ!前のドコが表示されません。これはtakeWhileが終了条件を満たしたときの値を流さないからです。

takeWhile
http://reactivex.io/documentation/operators/takewhile.html より

場当たり的に対応するには、最後はドコで終わるのがわかっているので、それを追加してあげればよいです。

_ = ズンドコ川
    .scan([]) { Array(($0 + [$1]).suffix(5)) }
    .takeWhile { $0 != [.ズン, .ズン, .ズン, .ズン, .ドコ] as [ズンドコ] }
    .map { $0.last! }
    .concat(Observable.just(ズンドコ.ドコ))
    .concat(Observable.never().take(0.5, scheduler: MainScheduler.instance))
    .subscribe(onNext: { print($0) }, onCompleted: { print("キ・ヨ・シ!") })

ただ、これだと最後に付加する値が終了条件に依存してしまうので、もうちょっとがんばってみます。takeWhileと反対の動きをするskipWhileでもう一本ストリームを作って、最初の一個をtakeしたものをmergeしてあげればよさそうです。

skipWhile
http://reactivex.io/documentation/operators/skipwhile.html より

プログラム全体はこうなりました。一気に複雑になってしまいました…。

import Foundation
import RxSwift

enum ズンドコ: UInt32 {
    case ズン
    case ドコ
}

let ズンドコ川 = Observable<Int>.interval(0.5, scheduler: MainScheduler.instance)
    .map { _ in ズンドコ(rawValue: arc4random_uniform(2))! }

let 直近5件 = ズンドコ川
    .scan([]) { Array(($0 + [$1]).suffix(5)) }
    .shareReplay(1)

let キヨシチェック: [ズンドコ] -> Bool = { $0 != [.ズン, .ズン, .ズン, .ズン, .ドコ] }

_ = Observable
    .of(
        直近5件.takeWhile(キヨシチェック),
        直近5件.skipWhile(キヨシチェック).take(1)
    )
    .merge()
    .map { $0.last! }
    .concat(Observable.never().take(0.5, scheduler: MainScheduler.instance))
    .subscribe(onNext: { print($0) }, onCompleted: { print("キ・ヨ・シ!") })

ストリームを出力と終了トリガーに分けてみる

複雑になってしまうのは終了判定と出力を一本のストリームでやっているからなので、これを分けることを考えます。以下のようになるのが理想的です。

_ = ズンドコ川
    .takeUntil(キヨシトリガー)
    .subscribe(onNext: { print($0) }, onCompleted: { print("キ・ヨ・シ!") })

takeUntilは、引数に与えたストリームに最初の値が流れてきたときに、本流のストリームを完了するオペレータです。最後にやっていたタイミング調整もこれで一緒にやってしまいましょう。

ここでキヨシトリガーは「[ズン,ズン,ズン,ズン,ドコ]になったタイミングの次のタイミングから値が流れはじめるストリーム」にすればよさそうです。このストリームはskipWhileskipを組み合わせるとシンプルに作れます。

let キヨシトリガー = ズンドコ川
    .scan([]) { Array(($0 + [$1]).suffix(5)) }
    .skipWhile { $0 != [.ズン, .ズン, .ズン, .ズン, .ドコ] }
    .skip(1)

キヨシトリガーの実装も含め、プログラム全体はこうなりました。だいぶスッキリしましたね。

import Foundation
import RxSwift

enum ズンドコ: UInt32 {
    case ズン
    case ドコ
}

let ズンドコ川 = Observable<Int>.interval(0.5, scheduler: MainScheduler.instance)
    .map { _ in ズンドコ(rawValue: arc4random_uniform(2))! }
    .shareReplay(1)

let キヨシトリガー = ズンドコ川
    .scan([]) { Array(($0 + [$1]).suffix(5)) }
    .skipWhile { $0 != [.ズン, .ズン, .ズン, .ズン, .ドコ] }
    .skip(1)

_ = ズンドコ川
    .takeUntil(キヨシトリガー)
    .subscribe(onNext: { print($0) }, onCompleted: { print("キ・ヨ・シ!") })

ただ、上記のように一本のストリームから分配したストリームでtakeUntilしているのですが、値の流れてくるタイミングがきちんと同期しているのかは、ちょっと心配ではあります。実装を追っていないのでなんとも言えませんが、一応ちゃんと動いているようです。

追記

ちょっと気になったので、とりあえず以下のようにdebugを挿入して観察してみたところ、必ずトリガーの方から先に値が流れてきているように見えます。shareReplayされているズンドコ川をsubscribeした順に値を流しているのかな…?

let キヨシトリガー = ズンドコ川
    .debug("T1")
    .scan([]) { Array(($0 + [$1]).suffix(5)) }
    .skipWhile { $0 != [.ズン, .ズン, .ズン, .ズン, .ドコ] }
    .skip(1)
    .debug("T2")

_ = ズンドコ川
    .debug("O1")
    .takeUntil(キヨシトリガー)
    .subscribe(onNext: { print($0) }, onCompleted: { print("キ・ヨ・シ!") })
2016-04-01 00:48:50.757: T2 -> subscribed
2016-04-01 00:48:50.759: T1 -> subscribed
2016-04-01 00:48:50.759: O1 -> subscribed
2016-04-01 00:48:51.260: T1 -> Event Next(ズン)
2016-04-01 00:48:51.261: O1 -> Event Next(ズン)
ズン
2016-04-01 00:48:51.761: T1 -> Event Next(ドコ)
2016-04-01 00:48:51.762: O1 -> Event Next(ドコ)
ズン
...
2016-04-01 00:48:55.260: T1 -> Event Next(ズン)
2016-04-01 00:48:55.261: O1 -> Event Next(ズン)
ズン
2016-04-01 00:48:55.760: T1 -> Event Next(ズン)
2016-04-01 00:48:55.761: O1 -> Event Next(ズン)
ズン
2016-04-01 00:48:56.260: T1 -> Event Next(ドコ)
2016-04-01 00:48:56.261: O1 -> Event Next(ドコ)
ドコ
2016-04-01 00:48:56.760: T1 -> Event Next(ドコ)
2016-04-01 00:48:56.761: T2 -> Event Next([ズンドコ.ズン, ズンドコ...., ズンドコ.ドコ, ズンドコ.ドコ])
キ・ヨ・シ!
2016-04-01 00:48:56.762: O1 -> disposed
2016-04-01 00:48:56.762: T2 -> disposed
2016-04-01 00:48:56.762: T1 -> disposed
2016-04-01 00:48:56.762: O1 -> Event Next(ドコ)

引き続きスケジューリングや同期周りの実装も調べてみます。なお、トリガーの方に値が流れてきたときに本流を終了する処理は、以下が起点になっているようです。
https://github.com/ReactiveX/RxSwift/blob/master/RxSwift/Observables/Implementations/TakeUntil.swift#L40

scanをやめてみる

最初のバージョンを投稿したあとに同僚から「Rx だとソースを5個に分配して、遅延なし・1遅延・2遅延・3遅延・4遅延にして、それをまとめて受け取ってズンズンズンズンドコと一致するか見るのが綺麗そう」と言われました。確かに!

遅延させるにはストリームの前に何か値を追加すればよいのですが、今回はズンドコをenumで表現しており空の値に該当するものがありません。そこで発想を変えて、1つずつ先頭からskipしたストリームをzipすることで同等の動きを実現してみました。この方法だと源流に値が5個流れてくるまで待機する形になりますが、出力系のストリームとは分けたおかげで問題にはなりません。

そうしてキヨシトリガーを変更したのが、一番最初に貼り付けた最終版です。以下再掲。上記のscanの部分が5本のストリームのzipに変わっただけです。

import Foundation
import RxSwift

enum ズンドコ: UInt32 {
    case ズン
    case ドコ
}

let ズンドコ川 = Observable<Int>.interval(0.5, scheduler: MainScheduler.instance)
    .map { _ in ズンドコ(rawValue: arc4random_uniform(2))! }
    .shareReplay(1)

let キヨシトリガー = [ズンドコ川, ズンドコ川.skip(1), ズンドコ川.skip(2), ズンドコ川.skip(3), ズンドコ川.skip(4)]
    .zip { $0 }
    .skipWhile { $0 != [.ズン, .ズン, .ズン, .ズン, .ドコ] }
    .skip(1)

_ = ズンドコ川
    .takeUntil(キヨシトリガー)
    .subscribe(onNext: { print($0) }, onCompleted: { print("キ・ヨ・シ!") })

上記以外の試行錯誤

ここまで書いた以外にも色々な試行錯誤をしたのですが、全部書くとキリがないので一部だけ箇条書しておきます。

  • takeWhileskipWhilemergeするところで、最初concatしていたら期待通り動かなかった
  • 出力系と判定系を完全に同期させようとして、終了条件を満たしているかをBoolで流すストリームとzipする方法も試したけど、また複雑になってしまったのでやめた
  • withLatestFromsampletimeoutなども試したがイマイチだった

このネタでここまで試行錯誤することになるとは思いませんでしたw

まとめ

元ネタである以下のツイートを読んで「なんだかストリーム向きだな」と思ったのがきっかけでRxSwift版を公開しましたが、他の言語による実装を見たり、試行錯誤の中でこれまで使ったことのないオペレータを試したりと、ずいぶん勉強になりました。普段からこういう問題でRx力を鍛えておくのはよさそうですね。

今回作成したプログラムをPlaygroundの形でまとめておきましたので、よかったらどうぞ。
https://github.com/bricklife/Zundoko-Kiyoshi-RxSwift

42
36
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
42
36