RxJava
RxJava2

RxJava 勉強会 Week3

More than 1 year has passed since last update.

Schedulers


Schedulers

  • newThread()
  • computation()
  • io()
  • single()
  • trampoline()
  • from()

おさらい

  • SubscribeOn() と ObserveOn()

SubscribeOn()

  • Observable が動作する Scheduler を指定
  • Operator Chain のどこで指定してもよい

ObserveOn()

  • Observer が動作する Scheduler を指定
  • Operator Chain で指定した後の Operator に適用される

イメージ図


デフォルトの動作

  • SubscribeOn() や ObserveOn() を指定しないと、現在のスレッドで実行

サンプル0:コード

何も指定せずに subscribe を実行

https://github.com/benkyokai/RxJavaStudy/blob/master/Week1/src/main/java/com/hjm/week3/Week3.java#L54


サンプル0:実行結果

subscribe: main
map1-1: main
map2-1: main
map3-1: main
onNext-1: main
map1-2: main
map2-2: main
map3-2: main
onNext-2: main
map1-3: main
map2-3: main
map3-3: main
onNext-3: main

サンプル1:コード

サンプル0をバックグラウンドスレッドで実行

https://github.com/benkyokai/RxJavaStudy/blob/master/Week1/src/main/java/com/hjm/week3/Week3.java#L85


サンプル1:実行結果

subscribe: pool-1-thread-1
map1-1: pool-1-thread-1
map2-1: pool-1-thread-1
map3-1: pool-1-thread-1
onNext-1: pool-1-thread-1
map1-2: pool-1-thread-1
map2-2: pool-1-thread-1
map3-2: pool-1-thread-1
onNext-2: pool-1-thread-1
map1-3: pool-1-thread-1
map2-3: pool-1-thread-1
map3-3: pool-1-thread-1
onNext-3: pool-1-thread-1

newThread()


API ドキュメント

それぞれの処理単位で新しいスレッドを生成します。


サンプル2:コード

SubscribeOn() と ObserveOn() で newThread() を指定

https://github.com/benkyokai/RxJavaStudy/blob/master/Week1/src/main/java/com/hjm/week3/Week3.java#L95


サンプル2:実行結果

subscribe: RxNewThreadScheduler-1
map1-1: RxNewThreadScheduler-2
map1-2: RxNewThreadScheduler-2
map1-3: RxNewThreadScheduler-2
map2-1: RxNewThreadScheduler-3
map2-2: RxNewThreadScheduler-3
map2-3: RxNewThreadScheduler-3
map3-1: RxNewThreadScheduler-4
map3-2: RxNewThreadScheduler-4
map3-3: RxNewThreadScheduler-4
onNext: RxNewThreadScheduler-5
onNext: RxNewThreadScheduler-5
onNext: RxNewThreadScheduler-5

サンプル3:コード

途中の ObserveOn() をひとつ抜いてみる

https://github.com/benkyokai/RxJavaStudy/blob/master/Week1/src/main/java/com/hjm/week3/Week3.java#L133


サンプル3:実行結果

subscribe: RxNewThreadScheduler-1
map1-1: RxNewThreadScheduler-2
map2-1: RxNewThreadScheduler-2
map1-2: RxNewThreadScheduler-2
map2-2: RxNewThreadScheduler-2
map1-3: RxNewThreadScheduler-2
map3-1: RxNewThreadScheduler-3
map2-3: RxNewThreadScheduler-2
map3-2: RxNewThreadScheduler-3
map3-3: RxNewThreadScheduler-3
onNext: RxNewThreadScheduler-4
onNext: RxNewThreadScheduler-4
onNext: RxNewThreadScheduler-4

computation()


API ドキュメント

計算処理用の Scehduler を返します。イベントループ、コールバック処理、計算処理に利用できます。
IO 束縛な処理には利用しないでください。そういうケースでは io() を利用しましょう。


実装

  • スレッドプール内のスレッドを再利用する Scheduler
  • スレッド数は
    • System.getProperty("rx2.computation-threads") or Runtime.getRuntime().availableProcessors()

サンプル4:コード

スレッド数を2に指定して、computation を利用

https://github.com/benkyokai/RxJavaStudy/blob/master/Week1/src/main/java/com/hjm/week3/Week3.java#L171


サンプル4:実行結果

subscribe: RxComputationThreadPool-1
map1-1: RxComputationThreadPool-2
map1-2: RxComputationThreadPool-2
map1-3: RxComputationThreadPool-2
map2-1: RxComputationThreadPool-1
map2-2: RxComputationThreadPool-1
map2-3: RxComputationThreadPool-1
map3-1: RxComputationThreadPool-2
map3-2: RxComputationThreadPool-2
onNext: RxComputationThreadPool-1
onNext: RxComputationThreadPool-1
map3-3: RxComputationThreadPool-2
onNext: RxComputationThreadPool-1

io()


API ドキュメント

IO 束縛な処理用の Scheduler です。必要に応じてスレッド数が増えるスレッドプールで実装されています。IO ブロックする非同期処理に利用できます。
計算処理用には利用しないでください。代わりに computation() を使いましょう。


実装

  • 必要に応じて新しいスレッドを立ち上げる Scheduler
  • ただし、スレッドは処理終了後 60 秒はキャッシュスレッドとして保持される
    • もし次回処理が 60 秒以内ならキャッシュスレッドを再利用される

サンプル5:コード

io() を利用し、2回 Observable を実行。

https://github.com/benkyokai/RxJavaStudy/blob/master/Week1/src/main/java/com/hjm/week3/Week3.java#L171


サンプル5:実行結果

subscribe: RxCachedThreadScheduler-1
map1-1: RxCachedThreadScheduler-2
map1-2: RxCachedThreadScheduler-2
map1-3: RxCachedThreadScheduler-2
map2-1: RxCachedThreadScheduler-3
map2-2: RxCachedThreadScheduler-3
map2-3: RxCachedThreadScheduler-3
map3-1: RxCachedThreadScheduler-4
map3-2: RxCachedThreadScheduler-4
map3-3: RxCachedThreadScheduler-4
onNext: RxCachedThreadScheduler-5
onNext: RxCachedThreadScheduler-5
onNext: RxCachedThreadScheduler-5

subscribe: RxCachedThreadScheduler-5
map1-1: RxCachedThreadScheduler-4
map1-2: RxCachedThreadScheduler-4
map2-1: RxCachedThreadScheduler-3
map1-3: RxCachedThreadScheduler-4
map2-2: RxCachedThreadScheduler-3
map3-1: RxCachedThreadScheduler-2
map2-3: RxCachedThreadScheduler-3
map3-2: RxCachedThreadScheduler-2
map3-3: RxCachedThreadScheduler-2
onNext: RxCachedThreadScheduler-1
onNext: RxCachedThreadScheduler-1
onNext: RxCachedThreadScheduler-1

=> 再利用されている


computation() vs io()


computation() vs io()

それぞれ破綻するケースを紹介

  • サンプル6:computation() 破綻ケース
  • サンプル7:io() 破綻ケース

サンプル6:コード

IO 束縛なネットワーク通信で computation() を利用

https://github.com/benkyokai/RxJavaStudy/blob/master/Week1/src/main/java/com/hjm/week3/Week3.java#L171


サンプル6:実行結果

(実際に実行)

サンプル6Fixed:コード

サンプル6で io() を利用

https://github.com/benkyokai/RxJavaStudy/blob/master/Week1/src/main/java/com/hjm/week3/Week3.java#L171


サンプル6Fixed:実行結果

(実際に実行)

サンプル7:コード

CPU 束縛な計算処理で io() を利用

https://github.com/benkyokai/RxJavaStudy/blob/master/Week1/src/main/java/com/hjm/week3/Week3.java#L171


サンプル7:実行結果

(実際に実行)

サンプル7Fixed:コード

サンプル7でcomputation() を利用

https://github.com/benkyokai/RxJavaStudy/blob/master/Week1/src/main/java/com/hjm/week3/Week3.java#L171


サンプル7Fixed:実行結果

(実際に実行)

single()


API ドキュメント

Scheduler インスタンスで管理される共有の1つのバックグラウンドスレッドを提供します。


サンプル8:コード

single() を使った Observable。

https://github.com/benkyokai/RxJavaStudy/blob/master/Week1/src/main/java/com/hjm/week3/Week3.java#L171


サンプル8:実行結果

subscribe: RxSingleScheduler-1
map1-1: RxSingleScheduler-1
map1-2: RxSingleScheduler-1
map1-3: RxSingleScheduler-1
map2-1: RxSingleScheduler-1
map2-2: RxSingleScheduler-1
map2-3: RxSingleScheduler-1
map3-1: RxSingleScheduler-1
map3-2: RxSingleScheduler-1
map3-3: RxSingleScheduler-1
onNext: RxSingleScheduler-1
onNext: RxSingleScheduler-1
onNext: RxSingleScheduler-1

=> 通常時と実行順序が違うことに注意


trampoline()


API ドキュメント

現在の処理が終わった後に実行される、呼ばれたスレッドで動作するキューを提供します。


よく分からない。orz

  • 現在のスレッドで実行するだけ?
  • コード上は queue が存在しているのだが、使われていなさそう。

参考

  1. https://medium.com/@I_Love_Coding/rxjava-schedulers-trampoline-use-cases-283f6649cbf#.q1xlqvoh5
    • 一つのスレッドで実行したいなら single() で代用可能
    • この利用方法だと裏には回らない。
  2. http://qiita.com/amay077/items/89093d33cd4f32154a5f
    • RxJava1 の情報か?RxJava2 には Schedulers.immediate() がない。

サンプル9:コード

trampoline() 利用例

https://github.com/benkyokai/RxJavaStudy/blob/master/Week1/src/main/java/com/hjm/week3/Week3.java#L171


サンプル9:実行結果

subscribe: main
map1-1: main
map2-1: main
map3-1: main
onNext: main
map1-2: main
map2-2: main
map3-2: main
onNext: main
map1-3: main
map2-3: main
map3-3: main
onNext: main

from()


API ドキュメント

java.util.concurrent.Executor を Scheduler に変換します


サンプル10:コード

Executors.newFixedThreadPool() を利用

https://github.com/benkyokai/RxJavaStudy/blob/master/Week1/src/main/java/com/hjm/week3/Week3.java#L171


サンプル10:実行結果

subscribe: pool-1-thread-1
map1-1: pool-1-thread-2
map1-2: pool-1-thread-2
map1-3: pool-1-thread-2
map2-1: pool-1-thread-3
map2-2: pool-1-thread-3
map2-3: pool-1-thread-3
map3-1: pool-1-thread-4
map3-2: pool-1-thread-4
map3-3: pool-1-thread-4
onNext: pool-1-thread-1
onNext: pool-1-thread-1
onNext: pool-1-thread-1

おしまい


勉強会メモ

  • subscribeOn() を二回書くとどうなるか(サンプル11)
    • ⇒ 先に書いた subscribeOn() が適用される
  • subscribe() は複数回呼べるか?(サンプル12)
    • ⇒ 複数回呼べる