RxJavaのAndroidバインディングであるRxAndroidを使い始めてBackpressureではまったので整理
retrolambdaでlambda式使っていますが、使っていなければ適宜new
するコードに置き換えてください
Backpressure
Backpressure · ReactiveX/RxJava Wiki
通信処理にRxJava使った時に、
System.err﹕ rx.exceptions.MissingBackpressureException
みたいなエラーを吐いてしまうことがあったのでその対策
問題のあるコード
通信部分は例えばこんな感じ
public Observable<Item> get() {
return Observable.create(subscriber -> {
subscriber.onStart();
List<Object> items = // 通信
for (int i = 0; i < items.size(); i++) {
Item item = // parseする
subscriber.onNext(Item);
}
subscriber.onCompleted();
});
}
何か通信して取ってきてパースしてItem
オブジェクトにしてからsubscriber#onNext
でsubscriberに返す、という流れ
これをsubscribe
する側はこんな感じ
private void fetchItems(String url) {
new Client(url).get()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
this::setResult,
Throwable::printStackTrace,
() -> Log.d(TAG, "complete"));
}
private void setResult(Item item) {
// 通信結果を何かする
}
ioスレッドで通信してメインスレッドでsetResult
メソッドを呼ぶ、という流れ
これだと上にも上げたようなエラーのstacktraceが表示された
W/System.err﹕ rx.exceptions.MissingBackpressureException
W/System.err﹕ at rx.internal.util.RxRingBuffer.onNext(RxRingBuffer.java:338)
W/System.err﹕ at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.onNext(OperatorObserveOn.java:115)
W/System.err﹕ at rx.internal.operators.OperatorSubscribeOn$1$1$1.onNext(OperatorSubscribeOn.java:76)
ワーカースレッドから送られてくるデータにメインスレッドでの処理が間に合ってないようなイメージだと思う
解決策
RxJava Wikiにもあるようにいくつか解決策があった
new Client(url).get()
.onBackpressureBuffer() // ここ
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
// .onBackpressureBuffer() ここだとダメ
.subscribe(/** なにか処理 **/);
この例のようにonBackpressureBuffer
やその他onBackpressureDrop
、onBackpressureBlock
などをメソッドチェーンでcallするといい感じに完了できる
-
onBackpressureBuffer
- ワーカースレッドからの通知をバッファリングする
-
onBackpressureDrop
- メインスレッドでの処理中にはワーカースレッドから通知せずに捨てる
-
onBackpressureBlock
- メインスレッドの処理が終わるまでワーカースレッド処理を一時停止させる
理解がやや曖昧ですが、こんな感じのはず
observeOn
より前にcallしなければいけない点に注意しなければいけない