Reactor には Flux.expand という「値を再帰的に展開していく」強力なオペレーターがあります。
この記事では、この expand を使って カーソルベースの逐次処理(cursor=1 → 2 → … → 10) を実装し、さらに flatMap(concurrency) を組み合わせて 非同期処理を順次または制御された並行数で実行するパターンを紹介します。
また、Flux 全体の実行時間(duration)を測定する方法もまとめています。
📌 実装コード
テストコード(実行時間をログ出力)
@Test
void test9(){
long start = System.currentTimeMillis();
cursorFluxWithExpandAndFlatMap()
.doOnNext(c -> System.out.println("emit cursor = " + c))
.doFinally(signal -> {
long end = System.currentTimeMillis();
System.out.println("### Total Duration = " + (end - start) + "ms, signal=" + signal);
})
.blockLast();
}
-
doFinallyは 完了 / エラー / キャンセル いずれの場合でも必ず最後に呼ばれるため、
全処理の duration を測りたい場合に最適です。 -
blockLast()で Flux の完了を待ちます。
⏱ 非同期で次のカーソルを取得する処理
private static Mono<Integer> fetchNextCursorAsync(int current){
return Mono.defer(() -> {
System.out.println("[async] current = " + current);
return Mono.just(current + 1); // 次のカーソルへ
})
.delayElement(Duration.ofMillis(100)) // 遅延を入れて非同期I/Oっぽくする
.subscribeOn(Schedulers.boundedElastic());
}
ポイント:
-
Mono.deferを使うことで subscribeされるまで処理を実行しない(遅延評価) -
delayElementは I/O待ちなどを想定した遅延 -
boundedElasticは「ブロッキング or 重い処理」を実行したいときの標準スケジューラ
🔄 expand + flatMap(concurrency) でカーソル連鎖処理
public static Flux<Integer> cursorFluxWithExpandAndFlatMap(){
return Flux.just(1)
.expand(cursor -> {
if(cursor >= 10) return Flux.empty(); // 再展開終了
return Flux.just(cursor)
.flatMap(WebFluxApplicationTests::fetchNextCursorAsync, 3);
// concurrency = 3 → 最大3つの非同期処理を並行実行
});
}
✔ どう動くのか?
-
Flux.just(1)でカーソル開始 -
expandが 1 → 2 → 3 → … → 10 を「再帰的に展開」 -
各ステップで
flatMap(fetchNextCursorAsync, concurrency = 3)を実行
→ 最大3つまで同時に非同期処理を走らせつつ、expand の構造で順次展開される
concurrency を 1 にすると?
.flatMap(WebFluxApplicationTests::fetchNextCursorAsync, 1)
- 完全に 直列処理(順次実行) になります。
- API のページングなど「前ページのカーソルを受け取って次を取りに行く」ケースでよく使う形。
📝 完全実行ログのイメージ
例として一部抜粋:
[async] current = 1
emit cursor = 1
[async] current = 2
emit cursor = 2
...
### Total Duration = 1043ms, signal=ON_COMPLETE
expand によって 1 → 2 → 3 と展開され続け、最後に実行時間がログ出力されます。
🚀 expand + flatMap(concurrency) を使うメリット
| 機能 | メリット |
|---|---|
| expand | 再帰を使わずに「次の値を返すとさらに展開される」処理を書ける |
| flatMap(concurrency) | 非同期処理の並行数を制御できる |
| doFinally | Flux 全体の実行時間を正確に測定できる |
特にカーソル型APIに最適
- 「ページネーションAPI」「nextCursor 方式」「ストリーミング取得」など
- 毎回 API を呼び出しながら次の cursor を取得する、という処理に非常に適しています。