Schedulers
Schedulers
- newThread()
- computation()
- io()
- single()
- trampoline()
- from()
おさらい
- SubscribeOn() と ObserveOn()
SubscribeOn()
- Observable が動作する Scheduler を指定
- Operator Chain のどこで指定してもよい
ObserveOn()
- Observer が動作する Scheduler を指定
- Operator Chain で指定した後の Operator に適用される
イメージ図

デフォルトの動作
- SubscribeOn() や ObserveOn() を指定しないと、現在のスレッドで実行
サンプル0:コード
何も指定せずに subscribe を実行
サンプル0:実行結果
subscribe: main
map1-1: main
map2-1: main
map3-1: main
onNext-1: main
map1-2: main
map2-2: main
map3-2: main
onNext-2: main
map1-3: main
map2-3: main
map3-3: main
onNext-3: main
サンプル1:コード
サンプル0をバックグラウンドスレッドで実行
サンプル1:実行結果
subscribe: pool-1-thread-1
map1-1: pool-1-thread-1
map2-1: pool-1-thread-1
map3-1: pool-1-thread-1
onNext-1: pool-1-thread-1
map1-2: pool-1-thread-1
map2-2: pool-1-thread-1
map3-2: pool-1-thread-1
onNext-2: pool-1-thread-1
map1-3: pool-1-thread-1
map2-3: pool-1-thread-1
map3-3: pool-1-thread-1
onNext-3: pool-1-thread-1
newThread()
API ドキュメント
それぞれの処理単位で新しいスレッドを生成します。
サンプル2:コード
SubscribeOn() と ObserveOn() で newThread() を指定
サンプル2:実行結果
subscribe: RxNewThreadScheduler-1
map1-1: RxNewThreadScheduler-2
map1-2: RxNewThreadScheduler-2
map1-3: RxNewThreadScheduler-2
map2-1: RxNewThreadScheduler-3
map2-2: RxNewThreadScheduler-3
map2-3: RxNewThreadScheduler-3
map3-1: RxNewThreadScheduler-4
map3-2: RxNewThreadScheduler-4
map3-3: RxNewThreadScheduler-4
onNext: RxNewThreadScheduler-5
onNext: RxNewThreadScheduler-5
onNext: RxNewThreadScheduler-5
サンプル3:コード
途中の ObserveOn() をひとつ抜いてみる
サンプル3:実行結果
subscribe: RxNewThreadScheduler-1
map1-1: RxNewThreadScheduler-2
map2-1: RxNewThreadScheduler-2
map1-2: RxNewThreadScheduler-2
map2-2: RxNewThreadScheduler-2
map1-3: RxNewThreadScheduler-2
map3-1: RxNewThreadScheduler-3
map2-3: RxNewThreadScheduler-2
map3-2: RxNewThreadScheduler-3
map3-3: RxNewThreadScheduler-3
onNext: RxNewThreadScheduler-4
onNext: RxNewThreadScheduler-4
onNext: RxNewThreadScheduler-4
computation()
API ドキュメント
計算処理用の Scehduler を返します。イベントループ、コールバック処理、計算処理に利用できます。
IO 束縛な処理には利用しないでください。そういうケースでは io() を利用しましょう。
実装
- スレッドプール内のスレッドを再利用する Scheduler
- スレッド数は
-
System.getProperty("rx2.computation-threads")
orRuntime.getRuntime().availableProcessors()
-
サンプル4:コード
スレッド数を2に指定して、computation を利用
サンプル4:実行結果
subscribe: RxComputationThreadPool-1
map1-1: RxComputationThreadPool-2
map1-2: RxComputationThreadPool-2
map1-3: RxComputationThreadPool-2
map2-1: RxComputationThreadPool-1
map2-2: RxComputationThreadPool-1
map2-3: RxComputationThreadPool-1
map3-1: RxComputationThreadPool-2
map3-2: RxComputationThreadPool-2
onNext: RxComputationThreadPool-1
onNext: RxComputationThreadPool-1
map3-3: RxComputationThreadPool-2
onNext: RxComputationThreadPool-1
io()
API ドキュメント
IO 束縛な処理用の Scheduler です。必要に応じてスレッド数が増えるスレッドプールで実装されています。IO ブロックする非同期処理に利用できます。
計算処理用には利用しないでください。代わりに computation() を使いましょう。
実装
- 必要に応じて新しいスレッドを立ち上げる Scheduler
- ただし、スレッドは処理終了後 60 秒はキャッシュスレッドとして保持される
- もし次回処理が 60 秒以内ならキャッシュスレッドを再利用される
サンプル5:コード
io() を利用し、2回 Observable を実行。
サンプル5:実行結果
subscribe: RxCachedThreadScheduler-1
map1-1: RxCachedThreadScheduler-2
map1-2: RxCachedThreadScheduler-2
map1-3: RxCachedThreadScheduler-2
map2-1: RxCachedThreadScheduler-3
map2-2: RxCachedThreadScheduler-3
map2-3: RxCachedThreadScheduler-3
map3-1: RxCachedThreadScheduler-4
map3-2: RxCachedThreadScheduler-4
map3-3: RxCachedThreadScheduler-4
onNext: RxCachedThreadScheduler-5
onNext: RxCachedThreadScheduler-5
onNext: RxCachedThreadScheduler-5
subscribe: RxCachedThreadScheduler-5
map1-1: RxCachedThreadScheduler-4
map1-2: RxCachedThreadScheduler-4
map2-1: RxCachedThreadScheduler-3
map1-3: RxCachedThreadScheduler-4
map2-2: RxCachedThreadScheduler-3
map3-1: RxCachedThreadScheduler-2
map2-3: RxCachedThreadScheduler-3
map3-2: RxCachedThreadScheduler-2
map3-3: RxCachedThreadScheduler-2
onNext: RxCachedThreadScheduler-1
onNext: RxCachedThreadScheduler-1
onNext: RxCachedThreadScheduler-1
=> 再利用されている
computation() vs io()
computation() vs io()
それぞれ破綻するケースを紹介
- サンプル6:computation() 破綻ケース
- サンプル7:io() 破綻ケース
サンプル6:コード
IO 束縛なネットワーク通信で computation() を利用
サンプル6:実行結果
(実際に実行)
サンプル6Fixed:コード
サンプル6で io() を利用
サンプル6Fixed:実行結果
(実際に実行)
サンプル7:コード
CPU 束縛な計算処理で io() を利用
サンプル7:実行結果
(実際に実行)
サンプル7Fixed:コード
サンプル7でcomputation() を利用
サンプル7Fixed:実行結果
(実際に実行)
single()
API ドキュメント
Scheduler インスタンスで管理される共有の1つのバックグラウンドスレッドを提供します。
サンプル8:コード
single() を使った Observable。
サンプル8:実行結果
subscribe: RxSingleScheduler-1
map1-1: RxSingleScheduler-1
map1-2: RxSingleScheduler-1
map1-3: RxSingleScheduler-1
map2-1: RxSingleScheduler-1
map2-2: RxSingleScheduler-1
map2-3: RxSingleScheduler-1
map3-1: RxSingleScheduler-1
map3-2: RxSingleScheduler-1
map3-3: RxSingleScheduler-1
onNext: RxSingleScheduler-1
onNext: RxSingleScheduler-1
onNext: RxSingleScheduler-1
=> 通常時と実行順序が違うことに注意
trampoline()
API ドキュメント
現在の処理が終わった後に実行される、呼ばれたスレッドで動作するキューを提供します。
よく分からない。orz
- 現在のスレッドで実行するだけ?
- コード上は queue が存在しているのだが、使われていなさそう。
参考
- 一つのスレッドで実行したいなら single() で代用可能
- この利用方法だと裏には回らない。
- RxJava1 の情報か?RxJava2 には Schedulers.immediate() がない。
サンプル9:コード
trampoline() 利用例
サンプル9:実行結果
subscribe: main
map1-1: main
map2-1: main
map3-1: main
onNext: main
map1-2: main
map2-2: main
map3-2: main
onNext: main
map1-3: main
map2-3: main
map3-3: main
onNext: main
from()
API ドキュメント
java.util.concurrent.Executor を Scheduler に変換します
サンプル10:コード
Executors.newFixedThreadPool() を利用
サンプル10:実行結果
subscribe: pool-1-thread-1
map1-1: pool-1-thread-2
map1-2: pool-1-thread-2
map1-3: pool-1-thread-2
map2-1: pool-1-thread-3
map2-2: pool-1-thread-3
map2-3: pool-1-thread-3
map3-1: pool-1-thread-4
map3-2: pool-1-thread-4
map3-3: pool-1-thread-4
onNext: pool-1-thread-1
onNext: pool-1-thread-1
onNext: pool-1-thread-1
おしまい
勉強会メモ
- subscribeOn() を二回書くとどうなるか(サンプル11)
- ⇒ 先に書いた subscribeOn() が適用される
- subscribe() は複数回呼べるか?(サンプル12)
- ⇒ 複数回呼べる