0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

RxJavaでFlowableを使って流量制限

Last updated at Posted at 2022-07-09

RxJava の Flowable を使った流量制限(Backpressure)の方法について調べる機会があったので、その時に分かったことをまとめる。
(意外と欲しい日本語情報少なかったので)

【追記】
2023/9に確認していていろいろおかしかったので大幅に書き直した

Observable と Flowable

RxJavaでは上流(Subscriber)と下流(Observer)で分けて考え、Java8に導入された Stream の終端操作とそれ以外の処理のような形で書くことができる。
例えば並列実行で使用する Observable は以下のような感じで記載できる。

Observable.fromIterable(IntStream.range(0, 10)
                                 .peek(System.out::println)::iterator)
          .blockingSubscribe(e -> {
            Thread.sleep(1000);
            System.out.println("num: " + e);
        });
出力結果
0
1
2
3
4
5
6
7
8
9
num: 0
num: 1
num: 2
num: 3
num: 4
num: 5
num: 6
num: 7
num: 8
num: 9

Stream と同様、上記のコードでは、subscribe を実行しない場合は、Observable#fromIterable で指定した上流(Subscriber)の処理は実行されない。
また、 Observable の場合は上流の処理と下流の処理は非同期に実行される。
(上流と下流で単一のスレッドを使用するようなケースを除く)

上流と下流で分けて記載できる以上、並列実行可能な処理のはずなので、処理時間を考えるとその処理は喜ぶべき動作ではあるのだが、
上流と下流で処理時間に大きな差があると致命的な問題が発生しうる。

上流と下流で処理が独立して動作するということは、上流の結果がどこかのメモリーに保持され、下流の処理を待っているということである。
即ち、下流の処理に時間が掛かる場合、そこに使用するメモリーが肥大化するため、OutOfMemory(OOM)の危険性がある。
当然のことだが、上流の結果は下流で使用されるデータのため、GCでも解放できない。
このようなケースでは、 Observable の代わりに流量制限が可能な Flowable を使用する。

Flowable#generate

上流の処理量を下流で完全に制御するには Flowable を使用すれば良い。

Flowable.fromIterable(IntStream.range(0, 10)
                               .peek(System.out::println)::iterator)
        .blockingSubscribe(e -> {
            Thread.sleep(1000);
            System.out.println("num: " + e);
        }, 3);
出力結果
0
1
2
3
num: 0
num: 1
num: 2
4
5
6
num: 3
num: 4
num: 5
7
8
9
num: 6
num: 7
num: 8
num: 9

これを使用すると、上流の処理は下流の処理がリクエストする度に実行されるので、必ず処理量が一定に保たれる。

ちなみに上記のように buffer size を指定すれば自動でその量を超えないように実行するが、以下のように下流の onNext で非同期で処理するような場合は意図した通りに挙動しない。

final var scheduler = Schedulers.from(Executors.newScheduledThreadPool(3));
Flowable.fromIterable(IntStream.range(0, 10)
                               .boxed()
                               .peek(System.out::println)::iterator)
        .subscribeOn(scheduler)
        .blockingSubscribe(i -> CompletableFuture.runAsync(() -> System.out.println("num: " + i)),
                           3);
出力結果
0
1
2
3
4
5
6
7
8
9
num: 7
num: 0
num: 3
num: 6
num: 9
num: 4
num: 2
num: 5
num: 1
num: 8

上記のような場合は、以下のように下流でバッファサイズと上流からのリクエストを指定することで流量制限できる。

final var scheduler = Schedulers.from(Executors.newScheduledThreadPool(3));
Flowable.fromIterable(IntStream.range(0, 10)
                               .boxed()
                               .peek(System.out::println)::iterator)
        .subscribeOn(scheduler)
        .observeOn(scheduler, false, 3)
        .blockingSubscribe(new FlowableSubscriber<>() {
            private Subscription subscription;
            @Override
            public void onSubscribe(@Nonnull Subscription s) {
                subscription = s;
                subscription.request(1L);
            }

            @Override
            public void onNext(Integer i) {
                CompletableFuture.runAsync(() -> System.out.println("num: " + i))
                                 .thenRun(() -> subscription.request(1L));
            }

            @Override
            public void onError(Throwable t) {
                // error
            }

            @Override
            public void onComplete() {
                // complete
            }
        });
出力結果
0
1
2
3
num: 0
num: 1
num: 2
4
num: 3
5
6
num: 4
7
num: 5
8
9
num: 6
num: 7
num: 8
num: 9

流量制限が効かない件……

以下のようなコードを書いた時に流量制限がかからずに困った。

final var scheduler = Schedulers.from(Executors.newScheduledThreadPool(10));
Flowable.fromIterable(IntStream.range(0, 10)
                               .boxed()
                               .peek(System.out::println)::iterator)
        .subscribeOn(scheduler)
        .observeOn(scheduler, false, 3)
        .blockingSubscribe(i -> {
            Thread.sleep(1000);
            System.out.println("num: " + i);
        });
出力結果
0
1
2
3
4
5
6
7
8
9
num: 0
num: 1
num: 2
num: 3
num: 4
num: 5
num: 6
num: 7
num: 8
num: 9

「Flowable 使えばよしなにやってくるんじゃ……?」と思っていたのだが、 Java doc の Backpressure の説明を読んでいると以下のような記載があった。

Backpressure:
The operator consumes the current Flowable in an unbounded manner (i.e., no backpressure applied to it).

Oh……😓
Flowable 使っておきながら流量制限掛けないシチュなんてない気がするし、デフォルトで buffer sizeが設定されていても良い気がするのだが……。
しかも、非同期実行の subscribe() は Backpressure が効かないメソッドしかないという。

あるいは Flowable#buffer とかで流量制限ができないかとも期待したのだが、

Backpressure:
The operator honors backpressure from downstream and expects the current Flowable to honor it as well, although not enforced; violation may lead to MissingBackpressureException somewhere downstream.

こいつ自体には流量制限をどうにかする機能はないようだ。

final var scheduler = Schedulers.from(Executors.newScheduledThreadPool(10));
Flowable.fromIterable(IntStream.range(0, 10)
                               .boxed()
                               .peek(System.out::println)::iterator)
        .subscribeOn(scheduler)
        .onBackpressureBuffer(3)
        .blockingSubscribe(e -> {
            Thread.sleep(1000);
            System.out.println("num: " + e);
        });

BackpressureStrategy

普通に Flowable を使用すれば流量制限が掛かるので過剰にメモリを使うことはないのだが、OOMを回避するだけであれば他の方法もある。

例えば上流側は常にイベントをいろいろ発行し続け、下流側は最新のイベントだけ処理できれば十分なケースの場合、
BackpressureStrategy で流量制限の方針を変更すれば良い。
ちなみに下流の処理待ちのバッファサイズは Flowable#observeOn で指定できる。

final var scheduler = Schedulers.from(Executors.newScheduledThreadPool(10));
Flowable.create(emitter -> {
            try {
                for (int i = 0; i < 10; i++) {
                    emitter.onNext(i);
                }
                emitter.onComplete();
            } catch (Throwable t) {
                emitter.onError(t);
            }
        }, BackpressureStrategy.LATEST)
        .subscribeOn(scheduler)
        .blockingSubscribe(e -> {
            Thread.sleep(1000);
            System.out.println(e);
        }, 3);
出力結果
0
1
2
9

BackpressureStrategy で指定可能なものは以下の5つである。

enum定数 バッファが溢れる場合
DROP 破棄
LATEST 新しいものを残す
ERROR 下流で例外を発生させる
BUFFER 無制限にバッファリング
MISSING 未定義。onBackpressureXXXで自分で動作を定義する

ObserveOn のキャッシュ

今までの例では、ObserveOn と subscribe で指定するバッファサイズを両方指定して変わる意図はなかったが、両者は別の値のため、例えば前述のコードで以下のように両方指定すると実行結果が変化する。

final var scheduler = Schedulers.from(Executors.newScheduledThreadPool(10));
Flowable.create(emitter -> {
            try {
                for (int i = 0; i < 10; i++) {
                    emitter.onNext(i);
                }
                emitter.onComplete();
            } catch (Throwable t) {
                emitter.onError(t);
            }
        }, BackpressureStrategy.LATEST)
        .subscribeOn(scheduler)
        .observeOn(scheduler, false, 3)
        .blockingSubscribe(e -> {
            Thread.sleep(1000);
            System.out.println(e);
        }, 3);
出力結果
0
1
2
3
4
5
9

上流で下流のバッファを監視

Flowable#observeOn で指定したバッファは、上流側では以下のようにバッファの空き状況を取得することもできる。
これを利用して、バッファに空きがない場合に処理を空ぶることで、過度に下流の流さないということも可能である。

final var scheduler = Schedulers.from(Executors.newScheduledThreadPool(10));
Flowable.create(emitter -> {
            try {
                int i = 0;
                while (true) {
                    if (i == 10) {
                        break;
                    }
                    if (emitter.requested() > 0) {
                        emitter.onNext(i);
                        i++;
                    }
                }
                emitter.onComplete();
            } catch (Throwable t) {
                emitter.onError(t);
            }
        }, BackpressureStrategy.LATEST)
        .subscribeOn(scheduler)
        .blockingSubscribe(e -> {
            Thread.sleep(1000);
            System.out.println(e);
        }, 3);
出力結果
0
1
2
3
4
5
6
7
8
9
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?