Help us understand the problem. What is going on with this article?

RxJava 勉強会 Week3

More than 3 years have passed since last update.

RxJava 勉強会 Week3

by Shiozawa
1 / 55

Schedulers


Schedulers

  • newThread()
  • computation()
  • io()
  • single()
  • trampoline()
  • from()

おさらい

  • SubscribeOn() と ObserveOn()

SubscribeOn()

  • Observable が動作する Scheduler を指定
  • Operator Chain のどこで指定してもよい

ObserveOn()

  • Observer が動作する Scheduler を指定
  • Operator Chain で指定した後の Operator に適用される

イメージ図


デフォルトの動作

  • SubscribeOn() や ObserveOn() を指定しないと、現在のスレッドで実行

サンプル0:コード

何も指定せずに subscribe を実行

https://github.com/benkyokai/RxJavaStudy/blob/master/Week1/src/main/java/com/hjm/week3/Week3.java#L54


サンプル0:実行結果

subscribe: main
map1-1: main
map2-1: main
map3-1: main
onNext-1: main
map1-2: main
map2-2: main
map3-2: main
onNext-2: main
map1-3: main
map2-3: main
map3-3: main
onNext-3: main

サンプル1:コード

サンプル0をバックグラウンドスレッドで実行

https://github.com/benkyokai/RxJavaStudy/blob/master/Week1/src/main/java/com/hjm/week3/Week3.java#L85


サンプル1:実行結果

subscribe: pool-1-thread-1
map1-1: pool-1-thread-1
map2-1: pool-1-thread-1
map3-1: pool-1-thread-1
onNext-1: pool-1-thread-1
map1-2: pool-1-thread-1
map2-2: pool-1-thread-1
map3-2: pool-1-thread-1
onNext-2: pool-1-thread-1
map1-3: pool-1-thread-1
map2-3: pool-1-thread-1
map3-3: pool-1-thread-1
onNext-3: pool-1-thread-1

newThread()


API ドキュメント

それぞれの処理単位で新しいスレッドを生成します。


サンプル2:コード

SubscribeOn() と ObserveOn() で newThread() を指定

https://github.com/benkyokai/RxJavaStudy/blob/master/Week1/src/main/java/com/hjm/week3/Week3.java#L95


サンプル2:実行結果

subscribe: RxNewThreadScheduler-1
map1-1: RxNewThreadScheduler-2
map1-2: RxNewThreadScheduler-2
map1-3: RxNewThreadScheduler-2
map2-1: RxNewThreadScheduler-3
map2-2: RxNewThreadScheduler-3
map2-3: RxNewThreadScheduler-3
map3-1: RxNewThreadScheduler-4
map3-2: RxNewThreadScheduler-4
map3-3: RxNewThreadScheduler-4
onNext: RxNewThreadScheduler-5
onNext: RxNewThreadScheduler-5
onNext: RxNewThreadScheduler-5

サンプル3:コード

途中の ObserveOn() をひとつ抜いてみる

https://github.com/benkyokai/RxJavaStudy/blob/master/Week1/src/main/java/com/hjm/week3/Week3.java#L133


サンプル3:実行結果

subscribe: RxNewThreadScheduler-1
map1-1: RxNewThreadScheduler-2
map2-1: RxNewThreadScheduler-2
map1-2: RxNewThreadScheduler-2
map2-2: RxNewThreadScheduler-2
map1-3: RxNewThreadScheduler-2
map3-1: RxNewThreadScheduler-3
map2-3: RxNewThreadScheduler-2
map3-2: RxNewThreadScheduler-3
map3-3: RxNewThreadScheduler-3
onNext: RxNewThreadScheduler-4
onNext: RxNewThreadScheduler-4
onNext: RxNewThreadScheduler-4

computation()


API ドキュメント

計算処理用の Scehduler を返します。イベントループ、コールバック処理、計算処理に利用できます。
IO 束縛な処理には利用しないでください。そういうケースでは io() を利用しましょう。


実装

  • スレッドプール内のスレッドを再利用する Scheduler
  • スレッド数は
    • System.getProperty("rx2.computation-threads") or Runtime.getRuntime().availableProcessors()

サンプル4:コード

スレッド数を2に指定して、computation を利用

https://github.com/benkyokai/RxJavaStudy/blob/master/Week1/src/main/java/com/hjm/week3/Week3.java#L171


サンプル4:実行結果

subscribe: RxComputationThreadPool-1
map1-1: RxComputationThreadPool-2
map1-2: RxComputationThreadPool-2
map1-3: RxComputationThreadPool-2
map2-1: RxComputationThreadPool-1
map2-2: RxComputationThreadPool-1
map2-3: RxComputationThreadPool-1
map3-1: RxComputationThreadPool-2
map3-2: RxComputationThreadPool-2
onNext: RxComputationThreadPool-1
onNext: RxComputationThreadPool-1
map3-3: RxComputationThreadPool-2
onNext: RxComputationThreadPool-1

io()


API ドキュメント

IO 束縛な処理用の Scheduler です。必要に応じてスレッド数が増えるスレッドプールで実装されています。IO ブロックする非同期処理に利用できます。
計算処理用には利用しないでください。代わりに computation() を使いましょう。


実装

  • 必要に応じて新しいスレッドを立ち上げる Scheduler
  • ただし、スレッドは処理終了後 60 秒はキャッシュスレッドとして保持される
    • もし次回処理が 60 秒以内ならキャッシュスレッドを再利用される

サンプル5:コード

io() を利用し、2回 Observable を実行。

https://github.com/benkyokai/RxJavaStudy/blob/master/Week1/src/main/java/com/hjm/week3/Week3.java#L171


サンプル5:実行結果

subscribe: RxCachedThreadScheduler-1
map1-1: RxCachedThreadScheduler-2
map1-2: RxCachedThreadScheduler-2
map1-3: RxCachedThreadScheduler-2
map2-1: RxCachedThreadScheduler-3
map2-2: RxCachedThreadScheduler-3
map2-3: RxCachedThreadScheduler-3
map3-1: RxCachedThreadScheduler-4
map3-2: RxCachedThreadScheduler-4
map3-3: RxCachedThreadScheduler-4
onNext: RxCachedThreadScheduler-5
onNext: RxCachedThreadScheduler-5
onNext: RxCachedThreadScheduler-5

subscribe: RxCachedThreadScheduler-5
map1-1: RxCachedThreadScheduler-4
map1-2: RxCachedThreadScheduler-4
map2-1: RxCachedThreadScheduler-3
map1-3: RxCachedThreadScheduler-4
map2-2: RxCachedThreadScheduler-3
map3-1: RxCachedThreadScheduler-2
map2-3: RxCachedThreadScheduler-3
map3-2: RxCachedThreadScheduler-2
map3-3: RxCachedThreadScheduler-2
onNext: RxCachedThreadScheduler-1
onNext: RxCachedThreadScheduler-1
onNext: RxCachedThreadScheduler-1

=> 再利用されている


computation() vs io()


computation() vs io()

それぞれ破綻するケースを紹介

  • サンプル6:computation() 破綻ケース
  • サンプル7:io() 破綻ケース

サンプル6:コード

IO 束縛なネットワーク通信で computation() を利用

https://github.com/benkyokai/RxJavaStudy/blob/master/Week1/src/main/java/com/hjm/week3/Week3.java#L171


サンプル6:実行結果

(実際に実行)

サンプル6Fixed:コード

サンプル6で io() を利用

https://github.com/benkyokai/RxJavaStudy/blob/master/Week1/src/main/java/com/hjm/week3/Week3.java#L171


サンプル6Fixed:実行結果

(実際に実行)

サンプル7:コード

CPU 束縛な計算処理で io() を利用

https://github.com/benkyokai/RxJavaStudy/blob/master/Week1/src/main/java/com/hjm/week3/Week3.java#L171


サンプル7:実行結果

(実際に実行)

サンプル7Fixed:コード

サンプル7でcomputation() を利用

https://github.com/benkyokai/RxJavaStudy/blob/master/Week1/src/main/java/com/hjm/week3/Week3.java#L171


サンプル7Fixed:実行結果

(実際に実行)

single()


API ドキュメント

Scheduler インスタンスで管理される共有の1つのバックグラウンドスレッドを提供します。


サンプル8:コード

single() を使った Observable。

https://github.com/benkyokai/RxJavaStudy/blob/master/Week1/src/main/java/com/hjm/week3/Week3.java#L171


サンプル8:実行結果

subscribe: RxSingleScheduler-1
map1-1: RxSingleScheduler-1
map1-2: RxSingleScheduler-1
map1-3: RxSingleScheduler-1
map2-1: RxSingleScheduler-1
map2-2: RxSingleScheduler-1
map2-3: RxSingleScheduler-1
map3-1: RxSingleScheduler-1
map3-2: RxSingleScheduler-1
map3-3: RxSingleScheduler-1
onNext: RxSingleScheduler-1
onNext: RxSingleScheduler-1
onNext: RxSingleScheduler-1

=> 通常時と実行順序が違うことに注意


trampoline()


API ドキュメント

現在の処理が終わった後に実行される、呼ばれたスレッドで動作するキューを提供します。


よく分からない。orz

  • 現在のスレッドで実行するだけ?
  • コード上は queue が存在しているのだが、使われていなさそう。

参考

  1. https://medium.com/@I_Love_Coding/rxjava-schedulers-trampoline-use-cases-283f6649cbf#.q1xlqvoh5
    • 一つのスレッドで実行したいなら single() で代用可能
    • この利用方法だと裏には回らない。
  2. http://qiita.com/amay077/items/89093d33cd4f32154a5f
    • RxJava1 の情報か?RxJava2 には Schedulers.immediate() がない。

サンプル9:コード

trampoline() 利用例

https://github.com/benkyokai/RxJavaStudy/blob/master/Week1/src/main/java/com/hjm/week3/Week3.java#L171


サンプル9:実行結果

subscribe: main
map1-1: main
map2-1: main
map3-1: main
onNext: main
map1-2: main
map2-2: main
map3-2: main
onNext: main
map1-3: main
map2-3: main
map3-3: main
onNext: main

from()


API ドキュメント

java.util.concurrent.Executor を Scheduler に変換します


サンプル10:コード

Executors.newFixedThreadPool() を利用

https://github.com/benkyokai/RxJavaStudy/blob/master/Week1/src/main/java/com/hjm/week3/Week3.java#L171


サンプル10:実行結果

subscribe: pool-1-thread-1
map1-1: pool-1-thread-2
map1-2: pool-1-thread-2
map1-3: pool-1-thread-2
map2-1: pool-1-thread-3
map2-2: pool-1-thread-3
map2-3: pool-1-thread-3
map3-1: pool-1-thread-4
map3-2: pool-1-thread-4
map3-3: pool-1-thread-4
onNext: pool-1-thread-1
onNext: pool-1-thread-1
onNext: pool-1-thread-1

おしまい


勉強会メモ

  • subscribeOn() を二回書くとどうなるか(サンプル11)
    • ⇒ 先に書いた subscribeOn() が適用される
  • subscribe() は複数回呼べるか?(サンプル12)
    • ⇒ 複数回呼べる
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
No comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
ユーザーは見つかりませんでした