Edited at

RxJava 勉強会 Week3

More than 1 year has passed since last update.



Schedulers



Schedulers


  • newThread()

  • computation()

  • io()

  • single()

  • trampoline()

  • from()



おさらい


  • SubscribeOn() と ObserveOn()



SubscribeOn()


  • Observable が動作する Scheduler を指定

  • Operator Chain のどこで指定してもよい



ObserveOn()


  • Observer が動作する Scheduler を指定

  • Operator Chain で指定した後の Operator に適用される



イメージ図



デフォルトの動作


  • SubscribeOn() や ObserveOn() を指定しないと、現在のスレッドで実行



サンプル0:コード

何も指定せずに subscribe を実行

https://github.com/benkyokai/RxJavaStudy/blob/master/Week1/src/main/java/com/hjm/week3/Week3.java#L54



サンプル0:実行結果

subscribe: main

map1-1: main
map2-1: main
map3-1: main
onNext-1: main
map1-2: main
map2-2: main
map3-2: main
onNext-2: main
map1-3: main
map2-3: main
map3-3: main
onNext-3: main



サンプル1:コード

サンプル0をバックグラウンドスレッドで実行

https://github.com/benkyokai/RxJavaStudy/blob/master/Week1/src/main/java/com/hjm/week3/Week3.java#L85



サンプル1:実行結果

subscribe: pool-1-thread-1

map1-1: pool-1-thread-1
map2-1: pool-1-thread-1
map3-1: pool-1-thread-1
onNext-1: pool-1-thread-1
map1-2: pool-1-thread-1
map2-2: pool-1-thread-1
map3-2: pool-1-thread-1
onNext-2: pool-1-thread-1
map1-3: pool-1-thread-1
map2-3: pool-1-thread-1
map3-3: pool-1-thread-1
onNext-3: pool-1-thread-1



newThread()



API ドキュメント


それぞれの処理単位で新しいスレッドを生成します。




サンプル2:コード

SubscribeOn() と ObserveOn() で newThread() を指定

https://github.com/benkyokai/RxJavaStudy/blob/master/Week1/src/main/java/com/hjm/week3/Week3.java#L95



サンプル2:実行結果

subscribe: RxNewThreadScheduler-1

map1-1: RxNewThreadScheduler-2
map1-2: RxNewThreadScheduler-2
map1-3: RxNewThreadScheduler-2
map2-1: RxNewThreadScheduler-3
map2-2: RxNewThreadScheduler-3
map2-3: RxNewThreadScheduler-3
map3-1: RxNewThreadScheduler-4
map3-2: RxNewThreadScheduler-4
map3-3: RxNewThreadScheduler-4
onNext: RxNewThreadScheduler-5
onNext: RxNewThreadScheduler-5
onNext: RxNewThreadScheduler-5



サンプル3:コード

途中の ObserveOn() をひとつ抜いてみる

https://github.com/benkyokai/RxJavaStudy/blob/master/Week1/src/main/java/com/hjm/week3/Week3.java#L133



サンプル3:実行結果

subscribe: RxNewThreadScheduler-1

map1-1: RxNewThreadScheduler-2
map2-1: RxNewThreadScheduler-2
map1-2: RxNewThreadScheduler-2
map2-2: RxNewThreadScheduler-2
map1-3: RxNewThreadScheduler-2
map3-1: RxNewThreadScheduler-3
map2-3: RxNewThreadScheduler-2
map3-2: RxNewThreadScheduler-3
map3-3: RxNewThreadScheduler-3
onNext: RxNewThreadScheduler-4
onNext: RxNewThreadScheduler-4
onNext: RxNewThreadScheduler-4



computation()



API ドキュメント


計算処理用の Scehduler を返します。イベントループ、コールバック処理、計算処理に利用できます。

IO 束縛な処理には利用しないでください。そういうケースでは io() を利用しましょう。




実装


  • スレッドプール内のスレッドを再利用する Scheduler

  • スレッド数は



    • System.getProperty("rx2.computation-threads") or Runtime.getRuntime().availableProcessors()





サンプル4:コード

スレッド数を2に指定して、computation を利用

https://github.com/benkyokai/RxJavaStudy/blob/master/Week1/src/main/java/com/hjm/week3/Week3.java#L171



サンプル4:実行結果

subscribe: RxComputationThreadPool-1

map1-1: RxComputationThreadPool-2
map1-2: RxComputationThreadPool-2
map1-3: RxComputationThreadPool-2
map2-1: RxComputationThreadPool-1
map2-2: RxComputationThreadPool-1
map2-3: RxComputationThreadPool-1
map3-1: RxComputationThreadPool-2
map3-2: RxComputationThreadPool-2
onNext: RxComputationThreadPool-1
onNext: RxComputationThreadPool-1
map3-3: RxComputationThreadPool-2
onNext: RxComputationThreadPool-1



io()



API ドキュメント


IO 束縛な処理用の Scheduler です。必要に応じてスレッド数が増えるスレッドプールで実装されています。IO ブロックする非同期処理に利用できます。

計算処理用には利用しないでください。代わりに computation() を使いましょう。




実装


  • 必要に応じて新しいスレッドを立ち上げる Scheduler

  • ただし、スレッドは処理終了後 60 秒はキャッシュスレッドとして保持される


    • もし次回処理が 60 秒以内ならキャッシュスレッドを再利用される





サンプル5:コード

io() を利用し、2回 Observable を実行。

https://github.com/benkyokai/RxJavaStudy/blob/master/Week1/src/main/java/com/hjm/week3/Week3.java#L171



サンプル5:実行結果

subscribe: RxCachedThreadScheduler-1

map1-1: RxCachedThreadScheduler-2
map1-2: RxCachedThreadScheduler-2
map1-3: RxCachedThreadScheduler-2
map2-1: RxCachedThreadScheduler-3
map2-2: RxCachedThreadScheduler-3
map2-3: RxCachedThreadScheduler-3
map3-1: RxCachedThreadScheduler-4
map3-2: RxCachedThreadScheduler-4
map3-3: RxCachedThreadScheduler-4
onNext: RxCachedThreadScheduler-5
onNext: RxCachedThreadScheduler-5
onNext: RxCachedThreadScheduler-5

subscribe: RxCachedThreadScheduler-5
map1-1: RxCachedThreadScheduler-4
map1-2: RxCachedThreadScheduler-4
map2-1: RxCachedThreadScheduler-3
map1-3: RxCachedThreadScheduler-4
map2-2: RxCachedThreadScheduler-3
map3-1: RxCachedThreadScheduler-2
map2-3: RxCachedThreadScheduler-3
map3-2: RxCachedThreadScheduler-2
map3-3: RxCachedThreadScheduler-2
onNext: RxCachedThreadScheduler-1
onNext: RxCachedThreadScheduler-1
onNext: RxCachedThreadScheduler-1

=> 再利用されている



computation() vs io()



computation() vs io()

それぞれ破綻するケースを紹介


  • サンプル6:computation() 破綻ケース

  • サンプル7:io() 破綻ケース



サンプル6:コード

IO 束縛なネットワーク通信で computation() を利用

https://github.com/benkyokai/RxJavaStudy/blob/master/Week1/src/main/java/com/hjm/week3/Week3.java#L171



サンプル6:実行結果

(実際に実行)



サンプル6Fixed:コード

サンプル6で io() を利用

https://github.com/benkyokai/RxJavaStudy/blob/master/Week1/src/main/java/com/hjm/week3/Week3.java#L171



サンプル6Fixed:実行結果

(実際に実行)



サンプル7:コード

CPU 束縛な計算処理で io() を利用

https://github.com/benkyokai/RxJavaStudy/blob/master/Week1/src/main/java/com/hjm/week3/Week3.java#L171



サンプル7:実行結果

(実際に実行)



サンプル7Fixed:コード

サンプル7でcomputation() を利用

https://github.com/benkyokai/RxJavaStudy/blob/master/Week1/src/main/java/com/hjm/week3/Week3.java#L171



サンプル7Fixed:実行結果

(実際に実行)



single()



API ドキュメント


Scheduler インスタンスで管理される共有の1つのバックグラウンドスレッドを提供します。




サンプル8:コード

single() を使った Observable。

https://github.com/benkyokai/RxJavaStudy/blob/master/Week1/src/main/java/com/hjm/week3/Week3.java#L171



サンプル8:実行結果

subscribe: RxSingleScheduler-1

map1-1: RxSingleScheduler-1
map1-2: RxSingleScheduler-1
map1-3: RxSingleScheduler-1
map2-1: RxSingleScheduler-1
map2-2: RxSingleScheduler-1
map2-3: RxSingleScheduler-1
map3-1: RxSingleScheduler-1
map3-2: RxSingleScheduler-1
map3-3: RxSingleScheduler-1
onNext: RxSingleScheduler-1
onNext: RxSingleScheduler-1
onNext: RxSingleScheduler-1

=> 通常時と実行順序が違うことに注意



trampoline()



API ドキュメント


現在の処理が終わった後に実行される、呼ばれたスレッドで動作するキューを提供します。




よく分からない。orz


  • 現在のスレッドで実行するだけ?

  • コード上は queue が存在しているのだが、使われていなさそう。



参考



  1. https://medium.com/@I_Love_Coding/rxjava-schedulers-trampoline-use-cases-283f6649cbf#.q1xlqvoh5


    • 一つのスレッドで実行したいなら single() で代用可能

    • この利用方法だと裏には回らない。




  2. http://qiita.com/amay077/items/89093d33cd4f32154a5f


    • RxJava1 の情報か?RxJava2 には Schedulers.immediate() がない。





サンプル9:コード

trampoline() 利用例

https://github.com/benkyokai/RxJavaStudy/blob/master/Week1/src/main/java/com/hjm/week3/Week3.java#L171



サンプル9:実行結果

subscribe: main

map1-1: main
map2-1: main
map3-1: main
onNext: main
map1-2: main
map2-2: main
map3-2: main
onNext: main
map1-3: main
map2-3: main
map3-3: main
onNext: main



from()



API ドキュメント


java.util.concurrent.Executor を Scheduler に変換します




サンプル10:コード

Executors.newFixedThreadPool() を利用

https://github.com/benkyokai/RxJavaStudy/blob/master/Week1/src/main/java/com/hjm/week3/Week3.java#L171



サンプル10:実行結果

subscribe: pool-1-thread-1

map1-1: pool-1-thread-2
map1-2: pool-1-thread-2
map1-3: pool-1-thread-2
map2-1: pool-1-thread-3
map2-2: pool-1-thread-3
map2-3: pool-1-thread-3
map3-1: pool-1-thread-4
map3-2: pool-1-thread-4
map3-3: pool-1-thread-4
onNext: pool-1-thread-1
onNext: pool-1-thread-1
onNext: pool-1-thread-1



おしまい



勉強会メモ


  • subscribeOn() を二回書くとどうなるか(サンプル11)


    • ⇒ 先に書いた subscribeOn() が適用される



  • subscribe() は複数回呼べるか?(サンプル12)


    • ⇒ 複数回呼べる