Java
RxJava

Rxjava onBackpressureBufferはいつ溢れるか?

前提知識

RxJavaにはバックプレッシャと呼ばれる、流量制御の仕組みがある。
参考 詳解RxJava2:Backpressureで流速制御

onBackpressureBuffer

PublisherがSubscriberの消費スピードよりも早くデータを作った場合、バッファに生成されたデータを溜めておきたい。
このために、onBackpressureBufferメソッドが存在する。
buffer

このとき、引数でバッファの上限を設定できる
onBackpressureBuffer(int capacity)

observeOn

Subscriberの実行スレッドを切り替えたい。
このためにobserveOnメソッドが存在する。
observeOm

第3引数にbufferSizeが指定でき、request(bufferSize)が裏で実行される。
observeOn(Scheduler scheduler, boolean delayError, int bufferSize)

サンプル

Publisherが100ms毎にデータを生成し、Subscriberが1000msかけて消費する。

TestRxJava.java
    @Test
    public void バッファが溢れる() throws InterruptedException {
        Flowable.interval(100, TimeUnit.MILLISECONDS)
                //.doOnNext(x -> logger.info("1 doOnNext " + x))
                //.doOnRequest(t -> logger.info("1 doOnRequest " + t))
                //.doOnError(e -> logger.info("1 doOnError " + e))
                .onBackpressureBuffer(5)
                //.doOnNext(x -> logger.info("2 doOnNext " + x))
                //.doOnRequest(t -> logger.info("2 doOnRequest " + t))
                //.doOnError(e -> logger.info("2 doOnError " + e))
                .observeOn(Schedulers.computation(), false, 3)
                //.doOnNext(x -> logger.info("3 doOnNext " + x))
                //.doOnRequest(t -> logger.info("3 doOnRequest " + t))
                //.doOnError(e -> logger.info("3 doOnError " + e))
                .subscribe(e -> {
                    TimeUnit.SECONDS.sleep(1);
                }, Throwable::printStackTrace);
        TimeUnit.MINUTES.sleep(10);
    }

図にすると以下のようになる。
diagram.jpg

これを実行すると、以下のようなログが出力される。

22:11:12.850 [main] INFO TestRxJava - 3 doOnRequest 9223372036854775807
22:11:12.856 [main] INFO TestRxJava - 2 doOnRequest 3
22:11:12.856 [main] INFO TestRxJava - 1 doOnRequest 9223372036854775807
22:11:12.958 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 0
22:11:12.959 [RxComputationThreadPool-2] INFO TestRxJava - 2 doOnNext 0
22:11:12.959 [RxComputationThreadPool-1] INFO TestRxJava - 3 doOnNext 0
22:11:13.058 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 1
22:11:13.058 [RxComputationThreadPool-2] INFO TestRxJava - 2 doOnNext 1
22:11:13.158 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 2
22:11:13.158 [RxComputationThreadPool-2] INFO TestRxJava - 2 doOnNext 2
22:11:13.258 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 3
22:11:13.359 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 4
22:11:13.460 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 5
22:11:13.558 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 6
22:11:13.659 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 7
22:11:13.759 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 8
22:11:13.859 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 9
22:11:13.959 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 10
22:11:13.960 [RxComputationThreadPool-1] INFO TestRxJava - 3 doOnNext 1
22:11:14.059 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 11
22:11:45.094 [RxComputationThreadPool-1] INFO TestRxJava - 3 doOnNext 2
io.reactivex.exceptions.MissingBackpressureException: Buffer is full
    at io.reactivex.internal.operators.flowable.FlowableOnBackpressureBuffer$BackpressureBufferSubscriber.onNext(FlowableOnBackpressureBuffer.java:99)
    at io.reactivex.internal.operators.flowable.FlowableDoOnEach$DoOnEachSubscriber.onNext(FlowableDoOnEach.java:92)
    at io.reactivex.internal.operators.flowable.FlowableDoOnLifecycle$SubscriptionLambdaSubscriber.onNext(FlowableDoOnLifecycle.java:79)
    at io.reactivex.internal.operators.flowable.FlowableDoOnEach$DoOnEachSubscriber.onNext(FlowableDoOnEach.java:92)
    at io.reactivex.internal.operators.flowable.FlowableInterval$IntervalSubscriber.run(FlowableInterval.java:93)
    at io.reactivex.internal.schedulers.ScheduledDirectPeriodicTask.run(ScheduledDirectPeriodicTask.java:38)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
22:11:47.421 [RxComputationThreadPool-1] INFO TestRxJava - 2 doOnRequest 3
22:11:47.422 [RxComputationThreadPool-2] INFO TestRxJava - 2 doOnError io.reactivex.exceptions.MissingBackpressureException: Buffer is full
22:11:47.422 [RxComputationThreadPool-1] INFO TestRxJava - 3 doOnError io.reactivex.exceptions.MissingBackpressureException: Buffer is full

ログから、11のデータが生成されたときにBuffer is fullになっている。

しかし、Subscriberは2のデータまでしか消費していない。
22:11:13.158 [RxComputationThreadPool-2] INFO TestRxJava - 2 doOnNext 2

したがって、onBackpressureBufferに9のデータを溜めようとしたときにエラーになっている。
設定したcapacityは5なのに・・・。
onBackpressureBuffer(int capacity)

バッファが溢れるタイミング

onBackpressureBufferメソッドは裏でFlowableOnBackpressureBufferを作る。
このとき、内部でバッファ管理するために以下のようにQueueが作られていた。
FlowableOnBackpressureBuffer.javaのGithubソースコード

で、このQueue実装がこんなかんじになっていて、キューサイズの指定が2のx乗(1,2,4,8,16,32...)に切り上げた値になっていた。なので今回の場合は、4 < 5 < 8 で実際のキャパシティが8。

別パターンで試す。onBackpressureBuffer(10)にすると、 8 < 10 < 16のため、16に切り上げられるはず。

TestRxJava.java
    @Test
    public void バッファが溢れる() throws InterruptedException {
        Flowable.interval(100, TimeUnit.MILLISECONDS)
                .doOnNext(x -> logger.info("1 doOnNext " + x))
                .doOnRequest(t -> logger.info("1 doOnRequest " + t))
                .doOnError(e -> logger.info("1 doOnError " + e))
                .onBackpressureBuffer(10)
                .doOnNext(x -> logger.info("2 doOnNext " + x))
                .doOnRequest(t -> logger.info("2 doOnRequest " + t))
                .doOnError(e -> logger.info("2 doOnError " + e))
                .observeOn(Schedulers.computation(), false, 3)
                .doOnNext(x -> logger.info("3 doOnNext " + x))
                .doOnRequest(t -> logger.info("3 doOnRequest " + t))
                .doOnError(e -> logger.info("3 doOnError " + e))
                .subscribe(e -> {
                    TimeUnit.SECONDS.sleep(1);
                }, Throwable::printStackTrace);
        TimeUnit.MINUTES.sleep(10);
    }

ログを見ると、17こめのデータをバッファに溜めるときに例外が出ているのがわかる。

07:55:03.742 [main] INFO TestRxJava - 3 doOnRequest 9223372036854775807
07:55:03.742 [main] INFO TestRxJava - 2 doOnRequest 3
07:55:03.742 [main] INFO TestRxJava - 1 doOnRequest 9223372036854775807
07:55:03.852 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 0
07:55:03.852 [RxComputationThreadPool-2] INFO TestRxJava - 2 doOnNext 0
07:55:03.852 [RxComputationThreadPool-1] INFO TestRxJava - 3 doOnNext 0
07:55:03.977 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 1
07:55:03.977 [RxComputationThreadPool-2] INFO TestRxJava - 2 doOnNext 1
07:55:04.071 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 2
07:55:04.071 [RxComputationThreadPool-2] INFO TestRxJava - 2 doOnNext 2
07:55:04.180 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 3
07:55:04.258 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 4
07:55:04.352 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 5
07:55:04.461 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 6
07:55:04.555 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 7
07:55:04.658 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 8
07:55:04.752 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 9
07:55:04.861 [RxComputationThreadPool-1] INFO TestRxJava - 3 doOnNext 1
07:55:04.861 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 10
07:55:04.950 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 11
07:55:05.054 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 12
07:55:05.151 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 13
07:55:05.264 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 14
07:55:05.357 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 15
07:55:05.451 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 16
07:55:05.561 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 17
07:55:05.657 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 18
07:55:05.751 [RxComputationThreadPool-2] INFO TestRxJava - 1 doOnNext 19
07:55:05.751 [RxComputationThreadPool-2] INFO TestRxJava - 2 doOnError io.reactivex.exceptions.MissingBackpressureException: Buffer is full
io.reactivex.exceptions.MissingBackpressureException: Buffer is full
07:55:05.876 [RxComputationThreadPool-1] INFO TestRxJava - 3 doOnError io.reactivex.exceptions.MissingBackpressureException: Buffer is full
    at io.reactivex.internal.operators.flowable.FlowableOnBackpressureBuffer$BackpressureBufferSubscriber.onNext(FlowableOnBackpressureBuffer.java:99)
    at io.reactivex.internal.operators.flowable.FlowableDoOnEach$DoOnEachSubscriber.onNext(FlowableDoOnEach.java:92)
    at io.reactivex.internal.operators.flowable.FlowableDoOnLifecycle$SubscriptionLambdaSubscriber.onNext(FlowableDoOnLifecycle.java:79)
    at io.reactivex.internal.operators.flowable.FlowableDoOnEach$DoOnEachSubscriber.onNext(FlowableDoOnEach.java:92)
    at io.reactivex.internal.operators.flowable.FlowableInterval$IntervalSubscriber.run(FlowableInterval.java:93)
    at io.reactivex.internal.schedulers.ScheduledDirectPeriodicTask.run(ScheduledDirectPeriodicTask.java:38)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

結論

onBackpressureBuffer の capacity はざっくりとした指定(2のx乗(1,2,4,8,16,32...)に切り上げる)のため、厳密な値ではない様子。