LoginSignup
3
4

More than 5 years have passed since last update.

RxJava 勉強会 Week3

Last updated at Posted at 2017-03-07
1 / 55

Schedulers


Schedulers

  • newThread()
  • computation()
  • io()
  • single()
  • trampoline()
  • from()

おさらい

  • SubscribeOn() と ObserveOn()

SubscribeOn()

  • Observable が動作する Scheduler を指定
  • Operator Chain のどこで指定してもよい

ObserveOn()

  • Observer が動作する Scheduler を指定
  • Operator Chain で指定した後の Operator に適用される

イメージ図


デフォルトの動作

  • SubscribeOn() や ObserveOn() を指定しないと、現在のスレッドで実行

サンプル0:コード

何も指定せずに subscribe を実行


サンプル0:実行結果

subscribe: main
map1-1: main
map2-1: main
map3-1: main
onNext-1: main
map1-2: main
map2-2: main
map3-2: main
onNext-2: main
map1-3: main
map2-3: main
map3-3: main
onNext-3: main

サンプル1:コード

サンプル0をバックグラウンドスレッドで実行


サンプル1:実行結果

subscribe: pool-1-thread-1
map1-1: pool-1-thread-1
map2-1: pool-1-thread-1
map3-1: pool-1-thread-1
onNext-1: pool-1-thread-1
map1-2: pool-1-thread-1
map2-2: pool-1-thread-1
map3-2: pool-1-thread-1
onNext-2: pool-1-thread-1
map1-3: pool-1-thread-1
map2-3: pool-1-thread-1
map3-3: pool-1-thread-1
onNext-3: pool-1-thread-1

newThread()


API ドキュメント

それぞれの処理単位で新しいスレッドを生成します。


サンプル2:コード

SubscribeOn() と ObserveOn() で newThread() を指定


サンプル2:実行結果

subscribe: RxNewThreadScheduler-1
map1-1: RxNewThreadScheduler-2
map1-2: RxNewThreadScheduler-2
map1-3: RxNewThreadScheduler-2
map2-1: RxNewThreadScheduler-3
map2-2: RxNewThreadScheduler-3
map2-3: RxNewThreadScheduler-3
map3-1: RxNewThreadScheduler-4
map3-2: RxNewThreadScheduler-4
map3-3: RxNewThreadScheduler-4
onNext: RxNewThreadScheduler-5
onNext: RxNewThreadScheduler-5
onNext: RxNewThreadScheduler-5

サンプル3:コード

途中の ObserveOn() をひとつ抜いてみる


サンプル3:実行結果

subscribe: RxNewThreadScheduler-1
map1-1: RxNewThreadScheduler-2
map2-1: RxNewThreadScheduler-2
map1-2: RxNewThreadScheduler-2
map2-2: RxNewThreadScheduler-2
map1-3: RxNewThreadScheduler-2
map3-1: RxNewThreadScheduler-3
map2-3: RxNewThreadScheduler-2
map3-2: RxNewThreadScheduler-3
map3-3: RxNewThreadScheduler-3
onNext: RxNewThreadScheduler-4
onNext: RxNewThreadScheduler-4
onNext: RxNewThreadScheduler-4

computation()


API ドキュメント

計算処理用の Scehduler を返します。イベントループ、コールバック処理、計算処理に利用できます。
IO 束縛な処理には利用しないでください。そういうケースでは io() を利用しましょう。


実装

  • スレッドプール内のスレッドを再利用する Scheduler
  • スレッド数は
    • System.getProperty("rx2.computation-threads") or Runtime.getRuntime().availableProcessors()

サンプル4:コード

スレッド数を2に指定して、computation を利用


サンプル4:実行結果

subscribe: RxComputationThreadPool-1
map1-1: RxComputationThreadPool-2
map1-2: RxComputationThreadPool-2
map1-3: RxComputationThreadPool-2
map2-1: RxComputationThreadPool-1
map2-2: RxComputationThreadPool-1
map2-3: RxComputationThreadPool-1
map3-1: RxComputationThreadPool-2
map3-2: RxComputationThreadPool-2
onNext: RxComputationThreadPool-1
onNext: RxComputationThreadPool-1
map3-3: RxComputationThreadPool-2
onNext: RxComputationThreadPool-1

io()


API ドキュメント

IO 束縛な処理用の Scheduler です。必要に応じてスレッド数が増えるスレッドプールで実装されています。IO ブロックする非同期処理に利用できます。
計算処理用には利用しないでください。代わりに computation() を使いましょう。


実装

  • 必要に応じて新しいスレッドを立ち上げる Scheduler
  • ただし、スレッドは処理終了後 60 秒はキャッシュスレッドとして保持される
    • もし次回処理が 60 秒以内ならキャッシュスレッドを再利用される

サンプル5:コード

io() を利用し、2回 Observable を実行。


サンプル5:実行結果

subscribe: RxCachedThreadScheduler-1
map1-1: RxCachedThreadScheduler-2
map1-2: RxCachedThreadScheduler-2
map1-3: RxCachedThreadScheduler-2
map2-1: RxCachedThreadScheduler-3
map2-2: RxCachedThreadScheduler-3
map2-3: RxCachedThreadScheduler-3
map3-1: RxCachedThreadScheduler-4
map3-2: RxCachedThreadScheduler-4
map3-3: RxCachedThreadScheduler-4
onNext: RxCachedThreadScheduler-5
onNext: RxCachedThreadScheduler-5
onNext: RxCachedThreadScheduler-5

subscribe: RxCachedThreadScheduler-5
map1-1: RxCachedThreadScheduler-4
map1-2: RxCachedThreadScheduler-4
map2-1: RxCachedThreadScheduler-3
map1-3: RxCachedThreadScheduler-4
map2-2: RxCachedThreadScheduler-3
map3-1: RxCachedThreadScheduler-2
map2-3: RxCachedThreadScheduler-3
map3-2: RxCachedThreadScheduler-2
map3-3: RxCachedThreadScheduler-2
onNext: RxCachedThreadScheduler-1
onNext: RxCachedThreadScheduler-1
onNext: RxCachedThreadScheduler-1

=> 再利用されている


computation() vs io()


computation() vs io()

それぞれ破綻するケースを紹介

  • サンプル6:computation() 破綻ケース
  • サンプル7:io() 破綻ケース

サンプル6:コード

IO 束縛なネットワーク通信で computation() を利用


サンプル6:実行結果

(実際に実行)

サンプル6Fixed:コード

サンプル6で io() を利用


サンプル6Fixed:実行結果

(実際に実行)

サンプル7:コード

CPU 束縛な計算処理で io() を利用


サンプル7:実行結果

(実際に実行)

サンプル7Fixed:コード

サンプル7でcomputation() を利用


サンプル7Fixed:実行結果

(実際に実行)

single()


API ドキュメント

Scheduler インスタンスで管理される共有の1つのバックグラウンドスレッドを提供します。


サンプル8:コード

single() を使った Observable。


サンプル8:実行結果

subscribe: RxSingleScheduler-1
map1-1: RxSingleScheduler-1
map1-2: RxSingleScheduler-1
map1-3: RxSingleScheduler-1
map2-1: RxSingleScheduler-1
map2-2: RxSingleScheduler-1
map2-3: RxSingleScheduler-1
map3-1: RxSingleScheduler-1
map3-2: RxSingleScheduler-1
map3-3: RxSingleScheduler-1
onNext: RxSingleScheduler-1
onNext: RxSingleScheduler-1
onNext: RxSingleScheduler-1

=> 通常時と実行順序が違うことに注意


trampoline()


API ドキュメント

現在の処理が終わった後に実行される、呼ばれたスレッドで動作するキューを提供します。


よく分からない。orz

  • 現在のスレッドで実行するだけ?
  • コード上は queue が存在しているのだが、使われていなさそう。

参考

  1. https://medium.com/@I_Love_Coding/rxjava-schedulers-trampoline-use-cases-283f6649cbf#.q1xlqvoh5
    • 一つのスレッドで実行したいなら single() で代用可能
    • この利用方法だと裏には回らない。
  2. http://qiita.com/amay077/items/89093d33cd4f32154a5f
    • RxJava1 の情報か?RxJava2 には Schedulers.immediate() がない。

サンプル9:コード

trampoline() 利用例


サンプル9:実行結果

subscribe: main
map1-1: main
map2-1: main
map3-1: main
onNext: main
map1-2: main
map2-2: main
map3-2: main
onNext: main
map1-3: main
map2-3: main
map3-3: main
onNext: main

from()


API ドキュメント

java.util.concurrent.Executor を Scheduler に変換します


サンプル10:コード

Executors.newFixedThreadPool() を利用


サンプル10:実行結果

subscribe: pool-1-thread-1
map1-1: pool-1-thread-2
map1-2: pool-1-thread-2
map1-3: pool-1-thread-2
map2-1: pool-1-thread-3
map2-2: pool-1-thread-3
map2-3: pool-1-thread-3
map3-1: pool-1-thread-4
map3-2: pool-1-thread-4
map3-3: pool-1-thread-4
onNext: pool-1-thread-1
onNext: pool-1-thread-1
onNext: pool-1-thread-1

おしまい


勉強会メモ

  • subscribeOn() を二回書くとどうなるか(サンプル11)
    • ⇒ 先に書いた subscribeOn() が適用される
  • subscribe() は複数回呼べるか?(サンプル12)
    • ⇒ 複数回呼べる
3
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
4