0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Reactor Flux.expand

Posted at

Reactor には Flux.expand という「値を再帰的に展開していく」強力なオペレーターがあります。
この記事では、この expand を使って カーソルベースの逐次処理(cursor=1 → 2 → … → 10) を実装し、さらに flatMap(concurrency) を組み合わせて 非同期処理を順次または制御された並行数で実行するパターンを紹介します。

また、Flux 全体の実行時間(duration)を測定する方法もまとめています。

📌 実装コード

テストコード(実行時間をログ出力)

@Test
void test9(){
    long start = System.currentTimeMillis();

    cursorFluxWithExpandAndFlatMap()
            .doOnNext(c -> System.out.println("emit cursor = " + c))
            .doFinally(signal -> {
                long end = System.currentTimeMillis();
                System.out.println("### Total Duration = " + (end - start) + "ms, signal=" + signal);
            })
            .blockLast();
}
  • doFinally完了 / エラー / キャンセル いずれの場合でも必ず最後に呼ばれるため、
    全処理の duration を測りたい場合に最適です。
  • blockLast() で Flux の完了を待ちます。

⏱ 非同期で次のカーソルを取得する処理

private static Mono<Integer> fetchNextCursorAsync(int current){
    return Mono.defer(() -> {
        System.out.println("[async] current = " + current);
        return Mono.just(current + 1);   // 次のカーソルへ
    })
    .delayElement(Duration.ofMillis(100)) // 遅延を入れて非同期I/Oっぽくする
    .subscribeOn(Schedulers.boundedElastic());
}

ポイント:

  • Mono.defer を使うことで subscribeされるまで処理を実行しない(遅延評価)
  • delayElement は I/O待ちなどを想定した遅延
  • boundedElastic は「ブロッキング or 重い処理」を実行したいときの標準スケジューラ

🔄 expand + flatMap(concurrency) でカーソル連鎖処理

public static Flux<Integer> cursorFluxWithExpandAndFlatMap(){
    return Flux.just(1)
            .expand(cursor -> {
                if(cursor >= 10) return Flux.empty();  // 再展開終了

                return Flux.just(cursor)
                        .flatMap(WebFluxApplicationTests::fetchNextCursorAsync, 3);
                        // concurrency = 3 → 最大3つの非同期処理を並行実行
            });
}

✔ どう動くのか?

  1. Flux.just(1) でカーソル開始

  2. expand が 1 → 2 → 3 → … → 10 を「再帰的に展開」

  3. 各ステップで

    flatMap(fetchNextCursorAsync, concurrency = 3)
    

    を実行
    最大3つまで同時に非同期処理を走らせつつ、expand の構造で順次展開される

concurrency を 1 にすると?

.flatMap(WebFluxApplicationTests::fetchNextCursorAsync, 1)
  • 完全に 直列処理(順次実行) になります。
  • API のページングなど「前ページのカーソルを受け取って次を取りに行く」ケースでよく使う形。

📝 完全実行ログのイメージ

例として一部抜粋:

[async] current = 1
emit cursor = 1
[async] current = 2
emit cursor = 2
...
### Total Duration = 1043ms, signal=ON_COMPLETE

expand によって 1 → 2 → 3 と展開され続け、最後に実行時間がログ出力されます。


🚀 expand + flatMap(concurrency) を使うメリット

機能 メリット
expand 再帰を使わずに「次の値を返すとさらに展開される」処理を書ける
flatMap(concurrency) 非同期処理の並行数を制御できる
doFinally Flux 全体の実行時間を正確に測定できる

特にカーソル型APIに最適

  • 「ページネーションAPI」「nextCursor 方式」「ストリーミング取得」など
  • 毎回 API を呼び出しながら次の cursor を取得する、という処理に非常に適しています。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?