Qiita Teams that are logged in
You are not logged in to any team

Log in to Qiita Team
Community
OrganizationAdvent CalendarQiitadon (β)
Service
Qiita JobsQiita ZineQiita Blog
42
Help us understand the problem. What is going on with this article?
@petitviolet

RxJava(RxAndroid)でのBackpressureについて

More than 5 years have passed since last update.

RxJavaのAndroidバインディングであるRxAndroidを使い始めてBackpressureではまったので整理
retrolambdaでlambda式使っていますが、使っていなければ適宜newするコードに置き換えてください

Backpressure

Backpressure · ReactiveX/RxJava Wiki

通信処理にRxJava使った時に、

System.err﹕ rx.exceptions.MissingBackpressureException

みたいなエラーを吐いてしまうことがあったのでその対策

問題のあるコード

通信部分は例えばこんな感じ

Client.java
public Observable<Item> get() {
    return Observable.create(subscriber -> {
        subscriber.onStart();
        List<Object> items = // 通信
        for (int i = 0; i < items.size(); i++) {
            Item item = // parseする
            subscriber.onNext(Item);
        }
        subscriber.onCompleted();
    });
}

何か通信して取ってきてパースしてItemオブジェクトにしてからsubscriber#onNextでsubscriberに返す、という流れ
これをsubscribeする側はこんな感じ

MainActivity.java
private void fetchItems(String url) {
    new Client(url).get()
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(
                    this::setResult,
                    Throwable::printStackTrace,
                    () -> Log.d(TAG, "complete"));
}

private void setResult(Item item) {
    // 通信結果を何かする
}

ioスレッドで通信してメインスレッドでsetResultメソッドを呼ぶ、という流れ
これだと上にも上げたようなエラーのstacktraceが表示された

W/System.err﹕ rx.exceptions.MissingBackpressureException
W/System.err﹕ at rx.internal.util.RxRingBuffer.onNext(RxRingBuffer.java:338)
W/System.err﹕ at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.onNext(OperatorObserveOn.java:115)
W/System.err﹕ at rx.internal.operators.OperatorSubscribeOn$1$1$1.onNext(OperatorSubscribeOn.java:76)

ワーカースレッドから送られてくるデータにメインスレッドでの処理が間に合ってないようなイメージだと思う

解決策

RxJava Wikiにもあるようにいくつか解決策があった

new Client(url).get()
        .onBackpressureBuffer() // ここ
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        // .onBackpressureBuffer() ここだとダメ
        .subscribe(/** なにか処理 **/);

この例のようにonBackpressureBufferやその他onBackpressureDroponBackpressureBlockなどをメソッドチェーンでcallするといい感じに完了できる

  • onBackpressureBuffer
    • ワーカースレッドからの通知をバッファリングする
  • onBackpressureDrop
    • メインスレッドでの処理中にはワーカースレッドから通知せずに捨てる
  • onBackpressureBlock
    • メインスレッドの処理が終わるまでワーカースレッド処理を一時停止させる

理解がやや曖昧ですが、こんな感じのはず
observeOnより前にcallしなければいけない点に注意しなければいけない

42
Help us understand the problem. What is going on with this article?
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
petitviolet
一発当てたい

Comments

No comments
Sign up for free and join this conversation.
Sign Up
If you already have a Qiita account Login
42
Help us understand the problem. What is going on with this article?