LoginSignup
239

More than 5 years have passed since last update.

詳解RxJava:Scheduler、非同期処理、subscribe/unsubscribe

Last updated at Posted at 2015-10-03

RxJavaで聞かれたり悩んだり、罠ったことを書き足していく予定です。
こちらもどうぞ: http://qiita.com/yuya_presto/items/152fa80d073d865cfc90

Scheduler、非同期処理

非同期でやるには?どこからが非同期で実行されるの?

Observableの実装や叩いたOperator(map()やfilter()などのメソッド)によりますが、基本的に現在のスレッド(subscribe()を呼び出したスレッド)で「同期的に」実行されます。

  • ドキュメントに Scheduler: This version of XXX does not operate by default on a particular Scheduler. と書かれているものは、現在のスレッドで実行されます
    • from(): 現在のスレッドで実行されます
    • Subject: onNext()などを呼び出したスレッドで実行されます
  • ドキュメントのScheduler欄にそれ以外の事が書かれている場合はそれに準じます
    • repeat(Scheduler scheduler)のようにschedulerを引数に取る場合は、指定したschedulerのスレッドで動作します
    • buffer(long timespan, long timeshift, java.util.concurrent.TimeUnit unit)はcomputation Schedulerで動作しますが、schedulerを引数に取るバージョンも存在します

一方で、ライブラリの場合はライブラリの仕様によるので、確認する必要があります。
例えばRetrofitは非同期です。HTTPリクエストを投げたスレッドで動作します。

同期的な処理のスレッドを切り替えて非同期なObservableにしたい場合は、Schedulerを使います。
observeOn()を使うと、呼んだ以降のストリームのスレッドを切り替えることができます。
subscribeOn()を指定すると、・・・これは説明が難しいので後述。

そもそもSchedulerってなんなの?

Schedulerは処理のqueueだと考えるとわかりやすいです。
ざっくりというと、ストリームに流れてきたアイテムをqueueに追加し、queueの中身が別のスレッド(もしくはAndroidのHandlerなど)で実行されます。どのスレッドで実行されるのか、並列で実行されるのかなどはそれぞれのSchedulerの実装に依存します。

非同期のObservableの結果をAndroidのUI更新に使いたい(=メインスレッド)

RxAndroidAndroidSchedulers.mainThread()を使うと実現できます。Jake神さすがです!!

asyncObservable
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(result -> render(result));

subscribeOn()とobserveOn()の違いって?

observeOn()はわかりやすく、それを呼び出した以降のストリームのスレッドを変えるだけです。

subscribeOn()は少し難しい。最終的な結果は同期的なObservableと非同期のObservableを分けて考える必要があります。

  • やること: Observableがsubscribe()された時に呼ばれる処理(OnSubscribe型で定義する)が実行されるスレッドを切り替えます
  • 同期的なObservableの場合: subscribe()の中で処理(Observable#from(List)ならlistのイテレーション)を実行しているので、subscribeOn()したスレッドで処理が実行されます
  • 非同期のObservableの場合: subscribe()の中で別のスレッドに処理(HTTPリクエストとか)を投げているので、subscribeOn()したスレッドではなく、そのスレッドで実行されます

同期的な場合のOnSubscribeの擬似コード:

class IterateListOnSubscribe implements OnSubscribe {
    void call(Subscriber subscriber) {
        for (T item : this.list) {
            subscriber.onNext(item); // onNext()はsubscribeOn()されたスレッドか、されてない場合現在のスレッドで実行される
        }
        subscriber.onCompleted();
    }
}

非同期の場合のOnSubscribeの擬似コード:

class HttpRequestOnSubscribe implements OnSubscribe {
    void call(Subscriber subscriber) {
        // executeの呼び出しはsubscribeOn()のSchedulerもしくはsubscribe()を呼び出したスレッド上で行われる
        this.httpClient.executeInBackgroundThread(new RequestCallback() {
            void onSuccess(Response response) {
                // Callbackの内側なので、非同期のリクエストが実行されたスレッドで呼び出される
                subscriber.onNext(response);
                subscriber.onCompleted();
            }

            void onFailure(RequestFailedException e) {
                subscriber.onError(e);
            }
        });
    }
}

subscribeOnは複数回呼び出した場合、最終的に一番根っこ側で指定されたSchedulerで実行されることになります。これはOnSubscribeが、subscribe()を呼び出した箇所からストリームの根っこに向かって順番に呼ばれるからです。詳しくはlift()の実装を読んでみてください。

onNext()とかの中でsynchronizedした方がいいの?

基本的に必要ありません。例外として、Subjectを使う場合は注意が必要です。後述します。

RxのonNext()などonXXX()のメソッド(Observer)は、設計(もしくはcontract(契約))上、複数のスレッドから呼ばれた場合でも必ず順番にしか呼ばれないと仮定されています。例えば、スレッドAがonNext()を呼び出している最中にスレッドBがonNext()を呼ぶようなことは起きません。スレッドAのonNext()がreturnしてからスレッドBが呼び出します。これによって、Observer側でsynchronizedしなくても、呼び出しとreturnの順番が前後しないようにできています。詳しくは、Rx Design Guidelinesの"Assume observer instances are called in a serialized fashion"に書かれています。

ただし、次の場合はsynchronizedが必要となる場合があります。副作用の塊であまりよいデザインではないはずです。

  • ストリームの中に流れてくるオブジェクトが別のスレッドから変更される
    • イミュータブル(または変更しない)なオブジェクトを使いましょう
  • 副作用のあるストリーム(map()のなかでストリームの外側のオブジェクトを変更するなど)
    • 外部の状態を使わずストリームで表現しましょう。doOnNext()などは正しい使い方ですが、必要に応じてobserveOn()するなどして、同じスレッドから呼ばれるようにしましょう

SubjectのonNext()とかをを複数のスレッドで呼びたい

Subjectを使う場合は、onNext()を自分で呼び出すことができるため注意が必要です。
何も考えずに複数のスレッドから呼び出すと、2つのスレッドが同じタイミングでObserverの呼び出しをしないという、前述の仮定が壊れてしまいます。queueを挟むことで同時に1つのスレッドでしか呼ばれないようにするSerializedSubjectを使うか、呼び出し側でsynchronizedを書いて自分で同期することで回避できます。

※getValue()は最終的にvolatileに保存された値を参照している(BehaviorSubjectの場合)ので、イミュータブルなオブジェクトであれば問題ありません。

flatMap()やmerge()を使った時のスレッドはどうなる?

非同期のObservableが混じっている場合、どのスレッドで実行されるかは「不定」です。SubjectのonNext()で触れたように複数スレッドの場合でも順番に呼ぶ必要があり、OperatorMergeクラスの実装を見るとqueueを使っていることがわかります。
必要に応じてobserveOn()を使いましょう。

※全て同期的なObservableの場合は、subscribeOn()のSchedulerもしくはsubscribe()したスレッドで実行されます。

非同期なObservableを同期的なObservableにしたいんだけど→やっちゃダメ

確かにtoBlocking()を使えばできるように見えますが、これはバッドノウハウです。非同期処理が実行されるスレッドと、現在のSchedulerの両方がブロックされます。場合によってはクラッシュします
https://github.com/ReactiveX/RxJava/issues/1804#issuecomment-61396523

非同期な処理を別のObservableから呼びたい場合は、flatMap()を使うなどしてストリームに連結してください。

observable.map(item -> asyncRequest(item).toBlocking().single()) // NG!!
        .map(...)

observable.flatMap(item -> asyncRequest(item)) // OK
        .map(...)

※同期的な処理に使うためにバグを直した際も抵抗感がある旨を伝えられたのですが、便利だから!Listを返却するような外部メソッドだったらどうにもならないから!ってアピールしてmergeしてもらった経緯があります(#3120)。

subscribe/unsubscribe

subscribeとunsubscribeって何やるの

Observableのsubscribe()の返り値はSubscription型です。これにはunsubscribe()メソッドが生えています。それぞれ下記のように機能します。

  • observable.subscribe()
    • 同期的なObservableの場合は、処理が完了するまで実行します
    • 非同期のObservableの場合は、処理を開始してreturnします
  • subscription.unsubscribe()
    • onError()やonCompleted()の際に自動的に呼ばれます
    • 同期的なObservableの場合は、subscribe()がreturnした時点ですでにunsubscribe()されているので効果がありません
    • 非同期のObservableの場合は、Observableの処理を中止します

なんでunsubscribeが大事なの

subscribe()すると、根っこのObservableからObserver(subscribe()のときに渡すcallback、onNext()など)が強参照されます。なので、使い終わったらunsubscribe()を呼び出さないと、Observerがleakします。onNext()ではよくthisを参照するので、AndroidだとActivityやFragmentがleakしてしまうのです。
これはAsyncTaskの悪い点でもあったのですが、unsubscribe()を呼べば(※かつOnSubscribeが正しく実装されていれば)キャンセル+リファレンスを解放できるのがRxJavaのメリットです。

上級者向け情報:new Observable()してOnSubscribeを自前実装する場合は、unsubscribe()されたときにsubscriberを開放するように正しく実装しないと、末端のObserverがleakしてしまう場合がありそうです。AbstractOnSubscribeを参照してみてください。もしくは、rxjava-asyncモジュールを使うのもよいかもしれません。

Androidのライフサイクルは?subscribe/unsubscribeはいつ?

subscribe/unsubscribeは基本的にonStart()→onStop()(visible lifetime)がおすすめです。
ドキュメントにて、visible lifetimeの例として、UIの更新のためのBroadcastIntent受信開始する、と書かれているのでピタリと一致します。
http://developer.android.com/reference/android/app/Activity.html#ActivityLifecycle

ただし、Fragmentの操作をonPause()(実際はonSavedInstaceState())以降にやるとIllegalStateExceptionが出る悪しき仕様があるので、この点には注意が必要です(しかし特段対策できるわけではない・・・画面回転時のrelaunchはonPause()、onSaveInstanceState()、onStop()が一気に呼ばれるようなので問題は発生しないはずですが、Android 2.xは違うかもしれません・・)。

CompositeSubscriptionでまとめてunsubscribe

subscribe()するたびにメンバ変数にSubscriptionを突っ込んでいき、最後にunsubscribe()を呼びまくるのもめんどうなので、Subscriptionをひとまとまりにして、まとめてunsubscribe()するためのクラスがCompositeSubscriptionです。

public void onStart() {
    super.onStart()
    Subscription subscriptionA = asyncRequest().observeOn(mainThread()).subscribe(result -> renderA(result));
    Subscription subscriptionB = anotherAsyncRequest().observeOn(mainThread()).subscribe(result -> renderB(result));
    mCompositeSubscription = new CompositeSubscription(subscriptionA, subscriptionB);
    Subscription subscriptionC = yetAnotherAsyncRequest().observeOn(mainThread()).subscribe(result -> renderC(result));
    mCompositeSubscription.add(subscriptionC);
}

public void onStop() {
    mCompositeSubscription.unsubscribe();
    super.onStop();
}

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
239