ズンドコキヨシ with RxSwift 最終版
前作「ズンドコキヨシ with RxSwift」は、とりあえずサクっとRxSwiftで書いてみたバージョンだったので、ちゃんと動いてはいるもののいくつか不満がありました。
- 出力が
doOnNext
とsubscribeCompleted
に分かれている - 終了判定のために本流のストリームを変形している
- リズム感アップのために
キ・ヨ・シ!
前にタイミング調整をしている -
scan
を使っているところにRxっぽさが欠けるw
上記を解消しようと色々試行錯誤した結果、できたのが以下です。自分の中ではこれを最終版としました。
import Foundation
import RxSwift
enum ズンドコ: UInt32 {
case ズン
case ドコ
}
let ズンドコ川 = Observable<Int>.interval(0.5, scheduler: MainScheduler.instance)
.map { _ in ズンドコ(rawValue: arc4random_uniform(2))! }
.shareReplay(1)
let キヨシトリガー = [ズンドコ川, ズンドコ川.skip(1), ズンドコ川.skip(2), ズンドコ川.skip(3), ズンドコ川.skip(4)]
.zip { $0 }
.skipWhile { $0 != [.ズン, .ズン, .ズン, .ズン, .ドコ] }
.skip(1)
_ = ズンドコ川
.takeUntil(キヨシトリガー)
.subscribe(onNext: { print($0) }, onCompleted: { print("キ・ヨ・シ!") })
このプログラムの説明も兼ねて、ここに至るまでの試行錯誤を書いてみたいと思います。興味のある方は続きをご覧ください。
出力を一箇所にまとめてみる
元々、プログラムは以下で終わりにしたかったんです。
...
.subscribe(onNext: { print($0) }, onCompleted: { print("キ・ヨ・シ!") })
ですが、最初に書いたバージョンでは、途中経過の出力だけをdoOnNext
でやっています。
_ = ズンドコ川
.doOnNext { print($0) }
.scan([]) { Array(($0 + [$1]).suffix(5)) }
.takeWhile { $0 != [.ズン, .ズン, .ズン, .ズン, .ドコ] }
.concat(Observable.never().take(0.5, scheduler: MainScheduler.instance))
.subscribeCompleted { print("キ・ヨ・シ!") }
これは、判定のためにストリームを変形してしまっているので変形前に出力したかったのと、takeWhile
の仕様によりこうなっています。
試しにdoOnNext
をやめて、ストリームの終了判定後に元の形に戻してみます。
_ = ズンドコ川
.scan([]) { Array(($0 + [$1]).suffix(5)) }
.takeWhile { $0 != [.ズン, .ズン, .ズン, .ズン, .ドコ] as [ズンドコ] }
.map { $0.last! }
.concat(Observable.never().take(0.5, scheduler: MainScheduler.instance))
.subscribe(onNext: { print($0) }, onCompleted: { print("キ・ヨ・シ!") })
一見うまくいきそうですが、これだとキ・ヨ・シ!
前のドコ
が表示されません。これはtakeWhile
が終了条件を満たしたときの値を流さないからです。
http://reactivex.io/documentation/operators/takewhile.html より
場当たり的に対応するには、最後はドコ
で終わるのがわかっているので、それを追加してあげればよいです。
_ = ズンドコ川
.scan([]) { Array(($0 + [$1]).suffix(5)) }
.takeWhile { $0 != [.ズン, .ズン, .ズン, .ズン, .ドコ] as [ズンドコ] }
.map { $0.last! }
.concat(Observable.just(ズンドコ.ドコ))
.concat(Observable.never().take(0.5, scheduler: MainScheduler.instance))
.subscribe(onNext: { print($0) }, onCompleted: { print("キ・ヨ・シ!") })
ただ、これだと最後に付加する値が終了条件に依存してしまうので、もうちょっとがんばってみます。takeWhile
と反対の動きをするskipWhile
でもう一本ストリームを作って、最初の一個をtake
したものをmerge
してあげればよさそうです。
http://reactivex.io/documentation/operators/skipwhile.html より
プログラム全体はこうなりました。一気に複雑になってしまいました…。
import Foundation
import RxSwift
enum ズンドコ: UInt32 {
case ズン
case ドコ
}
let ズンドコ川 = Observable<Int>.interval(0.5, scheduler: MainScheduler.instance)
.map { _ in ズンドコ(rawValue: arc4random_uniform(2))! }
let 直近5件 = ズンドコ川
.scan([]) { Array(($0 + [$1]).suffix(5)) }
.shareReplay(1)
let キヨシチェック: [ズンドコ] -> Bool = { $0 != [.ズン, .ズン, .ズン, .ズン, .ドコ] }
_ = Observable
.of(
直近5件.takeWhile(キヨシチェック),
直近5件.skipWhile(キヨシチェック).take(1)
)
.merge()
.map { $0.last! }
.concat(Observable.never().take(0.5, scheduler: MainScheduler.instance))
.subscribe(onNext: { print($0) }, onCompleted: { print("キ・ヨ・シ!") })
ストリームを出力と終了トリガーに分けてみる
複雑になってしまうのは終了判定と出力を一本のストリームでやっているからなので、これを分けることを考えます。以下のようになるのが理想的です。
_ = ズンドコ川
.takeUntil(キヨシトリガー)
.subscribe(onNext: { print($0) }, onCompleted: { print("キ・ヨ・シ!") })
takeUntil
は、引数に与えたストリームに最初の値が流れてきたときに、本流のストリームを完了するオペレータです。最後にやっていたタイミング調整もこれで一緒にやってしまいましょう。
ここでキヨシトリガー
は「[ズン,ズン,ズン,ズン,ドコ]
になったタイミングの__次のタイミング__から値が流れはじめるストリーム」にすればよさそうです。このストリームはskipWhile
とskip
を組み合わせるとシンプルに作れます。
let キヨシトリガー = ズンドコ川
.scan([]) { Array(($0 + [$1]).suffix(5)) }
.skipWhile { $0 != [.ズン, .ズン, .ズン, .ズン, .ドコ] }
.skip(1)
キヨシトリガー
の実装も含め、プログラム全体はこうなりました。だいぶスッキリしましたね。
import Foundation
import RxSwift
enum ズンドコ: UInt32 {
case ズン
case ドコ
}
let ズンドコ川 = Observable<Int>.interval(0.5, scheduler: MainScheduler.instance)
.map { _ in ズンドコ(rawValue: arc4random_uniform(2))! }
.shareReplay(1)
let キヨシトリガー = ズンドコ川
.scan([]) { Array(($0 + [$1]).suffix(5)) }
.skipWhile { $0 != [.ズン, .ズン, .ズン, .ズン, .ドコ] }
.skip(1)
_ = ズンドコ川
.takeUntil(キヨシトリガー)
.subscribe(onNext: { print($0) }, onCompleted: { print("キ・ヨ・シ!") })
ただ、上記のように一本のストリームから分配したストリームでtakeUntil
しているのですが、値の流れてくるタイミングがきちんと同期しているのかは、ちょっと心配ではあります。実装を追っていないのでなんとも言えませんが、一応ちゃんと動いているようです。
追記
ちょっと気になったので、とりあえず以下のようにdebug
を挿入して観察してみたところ、必ずトリガーの方から先に値が流れてきているように見えます。shareReplay
されているズンドコ川
をsubscribeした順に値を流しているのかな…?
let キヨシトリガー = ズンドコ川
.debug("T1")
.scan([]) { Array(($0 + [$1]).suffix(5)) }
.skipWhile { $0 != [.ズン, .ズン, .ズン, .ズン, .ドコ] }
.skip(1)
.debug("T2")
_ = ズンドコ川
.debug("O1")
.takeUntil(キヨシトリガー)
.subscribe(onNext: { print($0) }, onCompleted: { print("キ・ヨ・シ!") })
2016-04-01 00:48:50.757: T2 -> subscribed
2016-04-01 00:48:50.759: T1 -> subscribed
2016-04-01 00:48:50.759: O1 -> subscribed
2016-04-01 00:48:51.260: T1 -> Event Next(ズン)
2016-04-01 00:48:51.261: O1 -> Event Next(ズン)
ズン
2016-04-01 00:48:51.761: T1 -> Event Next(ドコ)
2016-04-01 00:48:51.762: O1 -> Event Next(ドコ)
ズン
...
2016-04-01 00:48:55.260: T1 -> Event Next(ズン)
2016-04-01 00:48:55.261: O1 -> Event Next(ズン)
ズン
2016-04-01 00:48:55.760: T1 -> Event Next(ズン)
2016-04-01 00:48:55.761: O1 -> Event Next(ズン)
ズン
2016-04-01 00:48:56.260: T1 -> Event Next(ドコ)
2016-04-01 00:48:56.261: O1 -> Event Next(ドコ)
ドコ
2016-04-01 00:48:56.760: T1 -> Event Next(ドコ)
2016-04-01 00:48:56.761: T2 -> Event Next([ズンドコ.ズン, ズンドコ...., ズンドコ.ドコ, ズンドコ.ドコ])
キ・ヨ・シ!
2016-04-01 00:48:56.762: O1 -> disposed
2016-04-01 00:48:56.762: T2 -> disposed
2016-04-01 00:48:56.762: T1 -> disposed
2016-04-01 00:48:56.762: O1 -> Event Next(ドコ)
引き続きスケジューリングや同期周りの実装も調べてみます。なお、トリガーの方に値が流れてきたときに本流を終了する処理は、以下が起点になっているようです。
https://github.com/ReactiveX/RxSwift/blob/master/RxSwift/Observables/Implementations/TakeUntil.swift#L40
scanをやめてみる
最初のバージョンを投稿したあとに同僚から「Rx だとソースを5個に分配して、遅延なし・1遅延・2遅延・3遅延・4遅延にして、それをまとめて受け取ってズンズンズンズンドコと一致するか見るのが綺麗そう」と言われました。確かに!
遅延させるにはストリームの前に何か値を追加すればよいのですが、今回はズン
とドコ
をenumで表現しており空の値に該当するものがありません。そこで発想を変えて、1つずつ先頭からskip
したストリームをzip
することで同等の動きを実現してみました。この方法だと源流に値が5個流れてくるまで待機する形になりますが、出力系のストリームとは分けたおかげで問題にはなりません。
そうしてキヨシトリガー
を変更したのが、一番最初に貼り付けた最終版です。以下再掲。上記のscan
の部分が5本のストリームのzip
に変わっただけです。
import Foundation
import RxSwift
enum ズンドコ: UInt32 {
case ズン
case ドコ
}
let ズンドコ川 = Observable<Int>.interval(0.5, scheduler: MainScheduler.instance)
.map { _ in ズンドコ(rawValue: arc4random_uniform(2))! }
.shareReplay(1)
let キヨシトリガー = [ズンドコ川, ズンドコ川.skip(1), ズンドコ川.skip(2), ズンドコ川.skip(3), ズンドコ川.skip(4)]
.zip { $0 }
.skipWhile { $0 != [.ズン, .ズン, .ズン, .ズン, .ドコ] }
.skip(1)
_ = ズンドコ川
.takeUntil(キヨシトリガー)
.subscribe(onNext: { print($0) }, onCompleted: { print("キ・ヨ・シ!") })
上記以外の試行錯誤
ここまで書いた以外にも色々な試行錯誤をしたのですが、全部書くとキリがないので一部だけ箇条書しておきます。
-
takeWhile
とskipWhile
をmerge
するところで、最初concat
していたら期待通り動かなかった - 出力系と判定系を完全に同期させようとして、終了条件を満たしているかをBoolで流すストリームと
zip
する方法も試したけど、また複雑になってしまったのでやめた -
withLatestFrom
やsample
、timeout
なども試したがイマイチだった
このネタでここまで試行錯誤することになるとは思いませんでしたw
まとめ
元ネタである以下のツイートを読んで「なんだかストリーム向きだな」と思ったのがきっかけでRxSwift版を公開しましたが、他の言語による実装を見たり、試行錯誤の中でこれまで使ったことのないオペレータを試したりと、ずいぶん勉強になりました。普段からこういう問題でRx力を鍛えておくのはよさそうですね。
Javaの講義、試験が「自作関数を作り記述しなさい」って問題だったから
— てくも (@kumiromilk) March 9, 2016
「ズン」「ドコ」のいずれかをランダムで出力し続けて「ズン」「ズン」「ズン」「ズン」「ドコ」の配列が出たら「キ・ヨ・シ!」って出力した後終了って関数作ったら満点で単位貰ってた
今回作成したプログラムをPlaygroundの形でまとめておきましたので、よかったらどうぞ。
https://github.com/bricklife/Zundoko-Kiyoshi-RxSwift