RxJava Advent Calendarの初日は実行順序について書きます。サンプルを用意したのでコードと実行結果を確認してください。
// サンプル1
Observable
.just(1, 2, 3, 4, 5)
.flatMap(it ->
Observable
.just(it)
.delay(5 - it, TimeUnit.SECONDS, Schedulers.io())
)
.subscribe(
it -> Log.d(TAG, "num:" + it)
);
11-21 21:28:02.470 3161-3186/com.kazy.rxjavaplayground D/MainActivity: num:5
11-21 21:28:03.470 3161-3185/com.kazy.rxjavaplayground D/MainActivity: num:4
11-21 21:28:04.471 3161-3184/com.kazy.rxjavaplayground D/MainActivity: num:3
11-21 21:28:05.458 3161-3183/com.kazy.rxjavaplayground D/MainActivity: num:2
11-21 21:28:06.458 3161-3182/com.kazy.rxjavaplayground D/MainActivity: num:1
上の例で注目したいのは入力順が{1,2,3,4,5}に対して、出力順が{5,4,3,2,1}な点についてです。なぜ順番通りに処理が行われないのでしょうか。このエントリでは上の例を元にストリームの実行順序について考えてみたいと思います。
上記の例(サンプル1)ではdelayの処理がブラックボックスの状態です。もしかしたらObservable#delay
の実装にヒントがあるかもしれません。しかしRxJavaのオペレータの実装を理解するのはやや大変なので今回は諦めます。その代わりにサンプル1を自前で実装したdelayに置き換えたサンプル2という実装を用意しましたので動かしてみます。
// サンプル2
Observable
.just(1, 2, 3, 4, 5)
.flatMap(it -> delay(it))
.subscribeOn(Schedulers.io())
.subscribe(
it -> Log.d(TAG, "num:" + it)
);
private Observable<Long> delay(long num) {
return Observable.fromEmitter(emitter -> {
try {
Thread.sleep(TimeUnit.SECONDS.toMillis(5 - num));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
emitter.onNext(num);
emitter.onCompleted();
}, Emitter.BackpressureMode.NONE);
}
11-21 21:33:51.378 7933-7950/com.kazy.rxjavaplayground D/MainActivity: num:1
11-21 21:33:54.381 7933-7950/com.kazy.rxjavaplayground D/MainActivity: num:2
11-21 21:33:56.383 7933-7950/com.kazy.rxjavaplayground D/MainActivity: num:3
11-21 21:33:57.386 7933-7950/com.kazy.rxjavaplayground D/MainActivity: num:4
11-21 21:33:57.386 7933-7950/com.kazy.rxjavaplayground D/MainActivity: num:5
今度は出力が{1,2,3,4,5}の順番で並びました。なぜ出力結果が変わってしまったのでしょうか、答えはスケジューラの使い方にあります。スケジューラはRxのストリームを簡単に複数スレッドで動かすため仕組みです。
例えばサンプル2の場合subscribeOn(Schedulers.io())
をメソッドチェーンに噛ませることで処理全体がSchedulers.io()
が提供するworkerThreadで動きます。 余談ですがsubscribeOn(Schedulers.io())
の処理がない場合呼び出し元のスレッドでRxの処理が動きます。またSchedulers.io()
は呼び出される度にサイズが無限のスレッドプールから一つスレッドを渡してくれるスケジューラです。サンプル2はworkerThread上で動作しながらも全ての処理が同じスレッドで動いているので出力は入力と同じ順番で返ってきました。
ここまでくると大体分かってくると思いますが、実はサンプル1はflatMap内でdelayが呼び出される度にSchedulers.io()
から新しいスレッドが割り与えられます。
結果として{1,2,3,4,5}の5つのitemは全てがほぼ並列に動作することになり、delay時間が短い順に出力が並びます。サンプル2の処理もdelayの呼び出しの度に新しいスレッドを割り当てるように実装出来れば、サンプル1と同じ挙動になるはずです。サンプル2に手を加えてサンプル1と同じ挙動になるように修正したものをサンプル3とします。
// サンプル3
Observable
.just(1, 2, 3, 4, 5)
.flatMap(it -> delay(it).subscribeOn(Schedulers.io()))
.subscribe(
it -> Log.d(TAG, "num:" + it)
);
private Observable<Long> delay(long num) {
return Observable.fromEmitter(emitter -> {
try {
Thread.sleep(TimeUnit.SECONDS.toMillis(5 - num));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
emitter.onNext(num);
emitter.onCompleted();
}, Emitter.BackpressureMode.NONE);
}
11-21 23:48:34.831 27251-27285/com.kazy.rxjavaplayground D/MainActivity: num:5
11-21 23:48:35.833 27251-27284/com.kazy.rxjavaplayground D/MainActivity: num:4
11-21 23:48:36.832 27251-27283/com.kazy.rxjavaplayground D/MainActivity: num:3
11-21 23:48:37.834 27251-27282/com.kazy.rxjavaplayground D/MainActivity: num:2
11-21 23:48:38.833 27251-27281/com.kazy.rxjavaplayground D/MainActivity: num:1
上手くいきました。最後にサンプル1の実装に手を加えて実行順序を保証したい場合はどうするのが正解でしょうか? この場合concatMap
を利用するのが適切です。
// サンプル4
Observable
.just(1, 2, 3, 4, 5)
.concatMap(it ->
Observable
.just(it)
.delay(5 - it, TimeUnit.SECONDS, Schedulers.io())
)
.subscribe(
it -> Log.d(TAG, "num:" + it)
);
4つのサンプルからわかったことをまとめると
- flatMapで処理を繋げた場合に実行順序の保証はない
- flatMap内は意図せず並列実行される可能性がある
- 処理を連結し、かつ実行順序を保証したい場合は
concatMap
を利用すべき
いかがでしたでしょうか、Rxjavaの実行順序が直感と反する簡単な例を紹介しました。もし「実行順序について②」を書くことが出来たら、やめた方がいいObservableの作り方、内部でスレッドが作成されるObservableの挙動、backpuressureなどに触れたいと思います。