Java
Android
RxJava
ReactiveX

逆引きよく使うRxJava1.x オペレータ

More than 1 year has passed since last update.


まえがき

私がよく使うRxJavaのオペレータの用法とユースケースを書き連ねていきます。

なにか間違いがありましたらコメントにて遠慮無く指摘をお願いします。


  • 匿名クラス使って書くと辛いので全部ラムダで書きます。

  • 型の情報を見せるためにあえてラムダを冗長に書いたりしています。


ラムダわからん

(callメソッドの引数 -> callメソッドのなかみ)


匿名クラス

observable.map(new Func1<Foo, Bar>() {

@Override
public Bar call(Foo foo) {
return foo.toBar()
}
});


ラムダ

observable.map((Foo foo) -> {

return foo.toBar()
});

// or
observable.map((Foo foo) -> foo.toBar())
observable.map(foo -> foo.toBar())


Observableを生成したい


アイテムを1つだけエミットするObservable


just

Observable.just(0) // 0

1つだけアイテムをエミットするObservableが生成されます。


リストの中身をエミットしてくれるObservable


from

Observable.from(intList)

intListの中身を1個ずつエミットしてくれるObservableが生成されます。


何もエミットしないObservable


empty

Observable.empty()

何もアイテムをエミットせず、onCompletedシグナルがエミットされるObservableが生成されます。


エラーだけをエミットするObservable


error

Observable.error(throwable)

何もアイテムをエミットせず、引数で受け取ったThrowableをonErrorシグナルとしてエミットするObservableが生成されます。


自由に生成したい


create

Observable.create(subscriber -> {

subscriber.onNext(0);
subscriber.onNext(1);
subscriber.onNext(2);
subscriber.onCompleted();
})

自分で自由に生成できます。

あわせて読んでほしい: http://qiita.com/kazy/items/9c8a97975023d0661a6c


エミットされるアイテムをフィルターしたい


filter

Observable.from(intList)        // 0, 1, 2, 3, 4, 5

.filter(i -> i%2 == 0) // 0, 2, 4

filterに渡した関数がtrueを返したときのみそのアイテムがエミットされます。


エミットされるアイテムを変換したい


map

api.fetchFoo()

.map((FooResponse fooRes) -> {
Foo foo = FooConverter.convert(fooRes);
return foo;
})

APIから取得したJsonをそのままクラスにしてもアプリ内では利用しづらいことがあります。

Observable<FooResponse>Observable<Foo>に変換しました。


ListがエミットされるのをListの中身がエミットされるように変換したい


flatMap

getFooListAsObservable().flatMap((List<Foo> fooList) -> Observable.from(fooList) )

Observable<List<Foo>Observable<Foo>に変換しました。


Observableを合成したい


それぞれのストリームがエミットするアイテムをひとつに合成する


zip

Observable.zip(Observable.just("1","2","3"),

Observable.just("A","B","C"),
(String number, String alphabet) -> number + alphabet )

1,2,3といったようにStringをエミットするObservableと

A,B,CといったようにStringをエミットするObservableを合成しました。

1A,2B,3CといったようにStringをエミットします。


それぞれのストリームを連結する


concat

Observable.concat(Observable.just("1","2","3"), Observable.just("A","B","C"))

1,2,3といったようにStringをエミットするObservableと

A,B,CといったようにStringをエミットするObservableを連結しました。

第一引数に渡したObservableのConpleteシグナルがエミットされると、第二引数に渡したObservableのアイテムがエミットされます。この場合1,2,3,A,B,Cとなります。


それぞれのストリームを合成する


merge

Observable.merge(Observable.just("1","2","3"), Observable.just("A","B","C"))

concatは単純に連結していましたが、mergeはそれぞれのアイテムがエミットされた順番を保って合成します。

それぞれのストリームにおいて

0秒に1

1秒にA

2秒に2

3秒にB

4秒に3

5秒にC

といった具合にエミットされた場合、1,A,2,B,3,Cとなります。


非同期処理


非同期処理を逐次実行したい


flatMap

api.fetchFoo()

.flatMap(api.fetchBar())

Fooを取得した後にBarを取得し始めます。

api.fetchFoo()

.flatMap(foo -> api.fetchBuz())
.flatMap(buz -> api.fetchBar())

Barの取得前にBuzを取得して欲しいなんて事になってもflatMapでObservableをねじ込むだけで済みます。


非同期処理を並列実行したい


zip

Observable.zip(api.fetchFoo(), api.fetchBar(), (Foo foo, Bar bar) -> null)

第1引数と第2引数に並列化したいObservableを渡します。第3引数は、両Observableがエミットしてくるアイテムを引数にとる関数を渡します。


ストリームに処理を挟み込みたい


doOnSubscribe/doOnUnSubscribe

api.fetchFoo()

.doOnSubscribe(showProgressBar())
.doOnUnSubscribe(hideProgressBar())

doOnSubscribeはそのObservableのsubscribe時に実行されます。

doOnUnSubscribeはSubscription#unsubscribeやCompositeSubscription#unsubscribe、onComplete後やonError後に実行されます。


doOnNext

api.fetchFoo()

.doOnNext((Foo foo) -> fooDao.insert(foo))
.flatMap(api.fetchBar())
.doOnNext((Bar bar) -> barDao.insert(bar))

doOnNextはそのObservableのアイテムエミット時にSubscriber#onNextより前に実行され、アイテムを参照することができます。

doOnNextの処理が終了するまでSubscriber#onNextは実行されません。


doOnCompleted/doOnError

doOnNextと似ているので割愛。


エラー処理


エラー時にリトライしたい


retry

api.fetchFoo()

.retry()

エラーが発生するとretryします。

retryにはいくつかオーバーロードメソッドが用意されており、retry回数を指定できたりもします。


条件をつけてリトライしたい


retryWhen

executeFoo().retryWhen((Observable<? extends Throwable> trigger) ->

trigger.filter(throwable -> throwable instanceOf FooException)
)

エラーが発生するとretryWhenに渡した関数が呼びだされ、Observable<? extends Throwable> triggerを取得できます。

triggerがThrowableをエミットすると元の処理(この場合executeFoo())がリトライされます。

サンプルコードの場合、そのThrowableがFooExceptionであった場合にのみエミットされるようにfilterしていますので、FooException以外のエラーが発生した場合にはリトライされません。


ユーザーにリトライするかどうかを委ねる

executeFoo().retryWhen((Observable<? extends Throwable> trigger) ->

trigger.flatMap((Throwable throwable) -> {
return Observable.create(subscriber -> {
Snackbar.make(view, "retry?", Snackbar.LENGTH_INDEFINITE)
.setAction("yes",v -> {
subsriber.onNext(null);
subscriber.onCompleted();
})
.show();
});
})
)

必ずしもthrowableをハンドリングする必要はありません。

このサンプルコードではflatMapを用いてユーザーがSnackBar(AndroidのViewのひとつ)のボタンをクリックした時のみにリトライをするように変換しています。


エラー時に代替値を利用したい


onErrorReturn

apiFetchFoo().onErrorReturn(throwable -> cache.loadFoo())

onErrorReturnを用いればerror時に代替となる値をエミットすることが可能です。

サンプルコードではAPIからFooの取得に失敗した際にcacheのFooを利用するようにしています。


おわり

よく使うのはこんなかんじですかね。他にもエミットの頻度を調整するオペレーターとかいろいろありますがとりあえずこれにて。