43
42

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

RxJava(RxAndroid)でのBackpressureについて

Posted at

RxJavaのAndroidバインディングであるRxAndroidを使い始めてBackpressureではまったので整理
retrolambdaでlambda式使っていますが、使っていなければ適宜newするコードに置き換えてください

Backpressure

Backpressure · ReactiveX/RxJava Wiki

通信処理にRxJava使った時に、

System.err﹕ rx.exceptions.MissingBackpressureException

みたいなエラーを吐いてしまうことがあったのでその対策

問題のあるコード

通信部分は例えばこんな感じ

Client.java
public Observable<Item> get() {
    return Observable.create(subscriber -> {
        subscriber.onStart();
        List<Object> items = // 通信
        for (int i = 0; i < items.size(); i++) {
            Item item = // parseする
            subscriber.onNext(Item);
        }
        subscriber.onCompleted();
    });
}

何か通信して取ってきてパースしてItemオブジェクトにしてからsubscriber#onNextでsubscriberに返す、という流れ
これをsubscribeする側はこんな感じ

MainActivity.java
private void fetchItems(String url) {
	new Client(url).get()
    	    .subscribeOn(Schedulers.io())
	        .observeOn(AndroidSchedulers.mainThread())
    	    .subscribe(
    	    		this::setResult,
    	    		Throwable::printStackTrace,
    	    		() -> Log.d(TAG, "complete"));
}

private void setResult(Item item) {
    // 通信結果を何かする
}

ioスレッドで通信してメインスレッドでsetResultメソッドを呼ぶ、という流れ
これだと上にも上げたようなエラーのstacktraceが表示された

W/System.err﹕ rx.exceptions.MissingBackpressureException
W/System.err﹕ at rx.internal.util.RxRingBuffer.onNext(RxRingBuffer.java:338)
W/System.err﹕ at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.onNext(OperatorObserveOn.java:115)
W/System.err﹕ at rx.internal.operators.OperatorSubscribeOn$1$1$1.onNext(OperatorSubscribeOn.java:76)

ワーカースレッドから送られてくるデータにメインスレッドでの処理が間に合ってないようなイメージだと思う

解決策

RxJava Wikiにもあるようにいくつか解決策があった

new Client(url).get()
		.onBackpressureBuffer() // ここ
   	    .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        // .onBackpressureBuffer() ここだとダメ
   	    .subscribe(/** なにか処理 **/);

この例のようにonBackpressureBufferやその他onBackpressureDroponBackpressureBlockなどをメソッドチェーンでcallするといい感じに完了できる

  • onBackpressureBuffer
    • ワーカースレッドからの通知をバッファリングする
  • onBackpressureDrop
    • メインスレッドでの処理中にはワーカースレッドから通知せずに捨てる
  • onBackpressureBlock
    • メインスレッドの処理が終わるまでワーカースレッド処理を一時停止させる

理解がやや曖昧ですが、こんな感じのはず
observeOnより前にcallしなければいけない点に注意しなければいけない

43
42
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
43
42

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?