LoginSignup
11

More than 5 years have passed since last update.

RxJavaがしてくれるというThread管理に、混乱したのでコードで実験

Last updated at Posted at 2017-12-12

RxJavaを使い始めたので、書いてみました。

始まりのコード

RxJavaを使えば、Threadの作成、管理をしなくてよく、お任せできるということなので便利そうなのですが、
勉強途中いったいどのThreadで何が起こっているのか分からなくなったので、書いて実験です。
まずは、 itemを3発 発射するObserableと,それに反応して処理するObserverをsubscribeします。
rxjava2でやります。

import io.reactivex.Observable;
public class RxJava2 {
  public static void main(String[] args){
    Observable.just("a","bb","ccc")
      .subscribe( (item)-> System.out.println("item :"+item));
  }
}

吐き出されたのは、

item :a
item :bb
item :ccc

Process finished with exit code 0

デフォルトでのRxJavaのスレッド

Observable側とObserver側それぞれが乗っかっているThreadはどれか見たい!

    Observable.just("a", "bb", "ccc")
      .doOnNext(c -> System.out.println("Emitting thread "+c+"\t" + Thread.currentThread().getName()))
      .subscribe((item) -> System.out.println("item : " + item + " Thread:\t" + Thread.currentThread().getName()));

doOnNextでObservable側をデバッグ、Observer側も、System.outで今のThreadの名前をプリントしました。

Emitting thread a   main
item : a Thread:    main
Emitting thread bb  main
item : bb Thread:   main
Emitting thread ccc main
item : ccc Thread:  main

Process finished with exit code 0

なるほど、すべてmainのスレッドで、走ってます。とくに何もしなければ非同期にならんということですね。

subscribeOnで非同期スレッド開始

では、非同期にしてみます。subscribeOnを使って、Thread管理してくれるSchedulerをなんか渡せよということなので、みてみます。Schedulersクラスのstatic関数からみつくろって、新しいThreadを作るタイプのSchedulerにお願いしてみます。
(ほかにも、CPUのコアに分けてThread処理してくれるやつや、io用にThreadPool使って、よろしくやってくれるやつもあるようです。)

    Observable.just("a", "bb", "ccc")
      .doOnNext(c -> System.out.println("Emitting thread "+c+"\t" + Thread.currentThread().getName()))
      .subscribeOn(Schedulers.newThread())
      .subscribe((item) -> System.out.println("item : " + item + " Thread:\t" + Thread.currentThread().getName()));

出力は、というと。

...

Process finished with exit code 0

なんもでんがなー。あ、そっか、非同期のため、main Thread終わるの早すぎで,非同期処理がまにあわず、なんも出ませんでした。sleepします。

    Observable.just("a", "bb", "ccc")
      .doOnNext(c -> System.out.println("Emitting thread "+c+"\t" + Thread.currentThread().getName()))
      .subscribeOn(Schedulers.newThread())
      .subscribe((item) -> System.out.println("item : " + item + " Thread:\t" + Thread.currentThread().getName()));

    System.out.println("--- main sleep start ---");
    Thread.sleep(1000);
    System.out.println("--- main sleep end   ---");
--- sleep start ---
Emitting thread a   RxNewThreadScheduler-1
item : a Thread:    RxNewThreadScheduler-1
Emitting thread bb  RxNewThreadScheduler-1
item : bb Thread:   RxNewThreadScheduler-1
Emitting thread ccc RxNewThreadScheduler-1
item : ccc Thread:  RxNewThreadScheduler-1
--- sleep end ---

Process finished with exit code 0

非同期OK。そして、全部、同じ新規Threadで処理されてしまいました。

observeOnでさらにスレッドを小分けに

Observerを、別Threadにするには、observeOnを使え、というのでやってみます。

   Observable.just("a", "bb", "ccc")
      .doOnNext(c -> System.out.println("Emitting thread "+c+"\t" + Thread.currentThread().getName()))
      .subscribeOn(Schedulers.newThread())
      .observeOn(Schedulers.newThread())
      .subscribe((item) -> System.out.println("item : " + item + " Thread:\t" + Thread.currentThread().getName()));

    System.out.println("--- main sleep start ---");
    Thread.sleep(1000);
    System.out.println("--- main sleep end   ---");
--- main sleep start ---
Emitting thread a   RxNewThreadScheduler-1
Emitting thread bb  RxNewThreadScheduler-1
Emitting thread ccc RxNewThreadScheduler-1
item : a Thread:    RxNewThreadScheduler-2
item : bb Thread:   RxNewThreadScheduler-2
item : ccc Thread:  RxNewThreadScheduler-2
--- main sleep end   ---

Process finished with exit code 0

お、Threadが分かれました!でも、出力順が変わった。
なんだか、Observableが全部 発射したあとに、Observerが、受け取ったようにみえる。いいのかな。
いや、それか、出力間隔が短くすぎで、ほとんど同時なため順番が変わったようにみえているだけなのだろうか。
発射の間隔を開いてみよう。
intevalというメソッドが間隔をあけてLong値を発射してくれるらしいので、やってみる。

   Observable.interval(1, TimeUnit.SECONDS, Schedulers.newThread())
      .doOnNext(c -> System.out.println("Emitting thread "+c+"\t" + Thread.currentThread().getName()))
      .observeOn(Schedulers.newThread())
      .subscribe((item) -> System.out.println("item : " + item + " Thread:\t" + Thread.currentThread().getName()));

    System.out.println("--- main sleep start ---");
    Thread.sleep(10000);
    System.out.println("--- main sleep end   ---");

1秒ごとに、0から1ずつ増やして、発射し、Main Threadには5秒寝てもらう。

--- main sleep start ---
Emitting thread 0   RxNewThreadScheduler-1
item : 0 Thread:    RxNewThreadScheduler-2
Emitting thread 1   RxNewThreadScheduler-1
item : 1 Thread:    RxNewThreadScheduler-2
Emitting thread 2   RxNewThreadScheduler-1
item : 2 Thread:    RxNewThreadScheduler-2
Emitting thread 3   RxNewThreadScheduler-1
item : 3 Thread:    RxNewThreadScheduler-2
Emitting thread 4   RxNewThreadScheduler-1
item : 4 Thread:    RxNewThreadScheduler-2
--- main sleep end   ---

よかった、Observableがemitしたあと、Observerが処理していることが念のため確認できた。

以上、非同期にするにはどうするかと、どのスレッドつかわれるかコードで確認してみました。
あースッキリ。

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
11