若干の釣り気味ですが、RxJavaを利用されている方には結構重要そうだよな−と思う話題だったのでQiitaにまとめました。
ことのはじまり
Check out the new fromAsync() operator for adapting callback-based sources into Observables. Stop using create()! https://t.co/3SvYMmgeWx
— Jake Wharton (@JakeWharton) 2016年7月10日
なんだかJakeが過激なこと言ってる気配があったので、背景を調べるために少しissueを漁り始めました。
きっかけのissue
https://github.com/ReactiveX/RxJava/issues/4177
すごく雑に3行でまとめると、
- 非Rxの世界の非同期処理をラップするのに
Observable#create
使うよね - でもback-puressureまでちゃんと考慮して実装するのクソ難しいよね(僕も多くの失敗から学んでやっと正しい実装をやっと理解したよ)
- みんなも間違った実装しがちだから、非同期処理をラップしやすいインターフェースあったほうがよくないかな?
という提案です。もう少しだけ文脈を補足すると、前提となっているのはn個のitemをemitするようなObservableについてです。Http通信のやり取りなどrx.Singleで事足りる場合は今回の議論の対象ではなさそうですね。
どうなったの?
2016/07/12地点で最新のver 1.1.7において、Observablue#fromAsync
が追加されました。
インターフェースはObservable#create
を使う場合とほとんど一緒ですが、第二引数にBackpressureModeというものを明示的に指定する必要があって、itemを貯めておきたいのか、最新だけ保持したいのかなど決めることが出来ます。ユースケースは様々だと思いますが、BUFFERとLATEST辺りがよく使われそうですね。
Observable.fromAsync(new Action1<AsyncEmitter<Item>>() {
@Override
public void call(final AsyncEmitter<Item> emitter) {
XyzClient.enqueue(new Callback<List<Item>>() {
@Override
public void onResponse(Response<Item> response) {
emitter.onNext(response.body);
}
@Override
public void onFailure(Call<List<Item>> call, Throwable t) {
emitter.onError(t);
}
});
}
}, AsyncEmitter.BackpressureMode.BUFFER);
私は普段Promise的な使い方しかほとんど利用しないので、なるほどという気持ちもありつつ、あまりピンときていません。ただ今後Observable#create
を敢えて利用するメリットも特になさそうなので、Observablue#fromAsync
で書いていこうという気持ちです。 正直わかってない部分もあるので、より詳しい人が詳細に解説して頂けると嬉しいです! お待ちしています。