Java
Android
RxJava
RxJava2

RxJava2.0 Observable, Single, Maybe, Completableの使い分けメモ

More than 1 year has passed since last update.


概要

RxJava2.0でObservableに似たFlowableやMaybeが追加されました。

またSingleやCompletableも以前から存在します。

つまりObservable, Flowable, Single, Maybe, Completableが存在します。

これらはObservableのように、item(Javaのインスタンスなど)をemit(排出)します。

使い分けを行うことで、使う側のObserver(Subscriber)で考えることを減らすことができます。

それぞれどのような特徴があり、どのように使い分けるかを見ていきます。

FlowableはBackpressureに関するもので少しジャンルが違うので省略しています。(正直自分がBackpressureが必要になるほど使いこなせていないです。。)

何か誤りなどございましたらご指摘お願いします。(編集リクエストいただけるとうれしいです。)


それぞれの特徴と使い分け


Observableおさらい

Observableは以下のように何回もitemをemitすることができます。

この例では"Hello"をemitter.onNextを呼ぶことで三回emitしています。

Observable.create((ObservableOnSubscribe<String>) emitter -> {

try {
// 文字列"Hello"をemitする
emitter.onNext("Hello");
emitter.onNext("Hello");
emitter.onNext("Hello");
// 完了状態にする
emitter.onComplete();
} catch (Exception ex) {
// 例外を知らせる
emitter.onError(ex);
}
}).subscribe(new DisposableObserver<String>() {
@Override
public void onNext(String value) {
// 三回呼ばれる
System.out.println(value);
}

@Override
public void onError(Throwable e) {
}

@Override
public void onComplete() {
// 最後に一回呼ばれる
}
});


Observer(Subscriber)でどう呼ばれるか?

oNext* (onError | onComplete)?

つまり0からN回oNextが呼ばれ、その後にonErrorまたはonCompleteのどちらかが呼ばれるかもしれないという呼ばれ方です。


Observer(Subscriber)が考えるパターン


  • エラー

  • 何もemitも完了もしない場合

  • 完了したが一個もitemがemitされない場合

  • 一個emitされて完了した場合

  • 複数emitされて完了した場合

  • 一個またはいくつかemitされて完了が呼ばれない場合

を考慮してプログラミングする必要があります。


使い所


  • GUIのイベントなど

  • 複数回emit(onNext)する場合でBackpressureを利用しない場合。


Single

emitterのonSuccessを呼ぶと一回だけObserver(Subscriber)のonSuccessが呼ばれます。(itemが一個だけemitされます。)複数回onSuccessを呼んでも一回だけしか呼ばれません。

onCompleteメソッドはないです。

つまり、onSuccessは引数が必要なので、何もアイテムを流さずにonCompleteメソッドを呼んだりできません。

Single.create((SingleOnSubscribe<String>) emitter -> {

try {
// 一個emitして完了
emitter.onSuccess("Single Hello");
} catch (Exception ex) {
emitter.onError(ex);
}
}).subscribe(new DisposableSingleObserver<String>() {
@Override
public void onSuccess(String value) {
// 一回呼ばれる
}

@Override
public void onError(Throwable e) {

}
});


Observer(Subscriber)でどう呼ばれるか?

(onSuccess | onError)?

つまり、onSuccessかonErrorが一回のみ呼ばれるか呼ばれないかです。


Observer(Subscriber)が考えるパターン


  • エラー

  • 何もemitも完了もしない場合

  • 一個emitされて完了した場合

を考慮してプログラミングする必要があります。Observableに比べると考えるべきことがかなり減りました。


使い所


  • 値を流す(itemをemit)かエラーかのどちらかの場合

  • RetrofitでAPIからGETしてくるときなど


Maybe

emitterのonSuccessまたはonCompleteまたはonErrorを呼ぶと一回だけObserver(Subscriber)の対応するメソッドが呼ばれます。

onCompleteメソッドもあるので、何もアイテムを流さずにonCompleteメソッドを呼んで完了できます。

Maybe.create((MaybeOnSubscribe<String>) emitter -> {

try {
String result = "Maybe Hello";
if (TextUtils.isEmpty(result)) {
// アイテムなしの場合はそのまま完了
emitter.onComplete();
return;
}
// 一個流して完了
emitter.onSuccess(result);
} catch (Exception ex) {
emitter.onError(ex);
}
}).subscribe(new DisposableMaybeObserver<String>() {
@Override
public void onSuccess(String value) {
// 一回呼ばれる
}

@Override
public void onError(Throwable e) {

}

@Override
public void onComplete() {

}
});


Observer(Subscriber)でどう呼ばれるか?

(onSuccess | onError | onComplete)?

onSuccessかonErrorかonCompleteのどれかが呼ばれる、または全く呼ばれません。


Observer(Subscriber)が考えるパターン


  • エラー

  • 何もemitも完了もしない場合

  • 完了したが一個もitemがemitされない場合

  • 一個emitされて完了した場合

を考慮してプログラミングする必要があります。


使い所


  • 値を流さないことが正常なことである場合

  • キャッシュ

    例えばDBにキャッシュしていて、まだキャッシュがないときはonComplete、キャッシュがあるときはonSuccessで流す


Completable

onCompleteメソッドまたはonErrorを一回だけ呼び出すことができます。

アイテムは何も流すことができません。終了するのみです。

Completable.create(emitter -> {

try {
// 何らかの処理
// 完了
emitter.onComplete();
} catch (Exception ex) {
emitter.onError(ex);
}
}).subscribe(new DisposableCompletableObserver<String>() {
@Override
public void onError(Throwable e) {

}

@Override
public void onComplete() {
// 一回呼ばれる
}
});


Observer(Subscriber)でどう呼ばれるか?

(onError | onComplete)?

onErrorかonCompleteのどれかが呼ばれる、または全く呼ばれません。


Observer(Subscriber)が考えるパターン


  • エラー

  • 何もemitも完了もしない場合

  • 完了したが一個もitemがemitされない場合


使い所


  • APIのPOST

  • DBへの保存

  • 返り値がない非同期処理


まとめ

使うことで種類が増えてしまうので学習コストが少し上がってしまうというデメリットは存在しますが、

Observer(Subscriber)で考えることが少なくてすむというメリットが存在します。

また利用しようとしなくても、使う必要が出てきてしまいます。例えば、最初に流れてきたものを取り出すObservable#firstを使うとSingleが返ってきたりなど、RxJava 2.0を使う上で完全には避けて通れないものとなりそうです。


参考

What's different in 2.0

https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0

Exploring RxJava 2 for Android (GOTOcph October 2016)

https://speakerdeck.com/jakewharton/exploring-rxjava-2-for-android-gotocph-october-2016

新しく追加されたCompletableについて

http://qiita.com/kazy/items/217eee4e167395d45478

rx.Singleについて

http://qiita.com/izumin5210/items/24449aee848b5ddbcf6d

rx.Single

http://angusmorton.com/rx-single/

http://stackoverflow.com/questions/36152475/why-to-use-new-feature-single-in-rxjava

http://stackoverflow.com/questions/40323307/observable-vs-flowable-rxjava2

https://github.com/ReactiveX/RxJava/issues/4534#issuecomment-246399782