1) Flux<List<String>>
→ Mono<List> (フラット化)
Flux<List<String>> flux = Flux.just(
List.of("A","B"),
List.of("C"),
List.of("D","E")
);
flux.reduce(new ArrayList<String>(), (acc, list) -> {
acc.addAll(list);
return acc;
}).subscribe(result -> System.out.println("reduce: " + result));
-
reduce()
を使い、複数のList<String>
を 1つのリストに結合(フラット化) [[A,B], [C], [D,E]] → [A, B, C, D, E]
結果
reduce: [A, B, C, D, E]
2) Flux<List<List<String>>>
→ Flux (2段階フラット化)
Flux<List<List<String>>> flux2 = Flux.just(
List.of(List.of("A","B"), List.of("C")),
List.of(List.of("D","E"), List.of("F"))
);
flux2.flatMapIterable(innerLists -> innerLists)
.flatMapIterable(list -> list)
.subscribe(System.out::println);
- 2重リストを2段階で展開して
String
までフラット化 [[A,B],[C]] → [A,B,C]
結果
A
B
C
D
E
F
Flux → Mono → Flux
へ行って戻る例
Flux<String> flux = Flux.just("A","B","C");
Mono<List<String>> monoList = flux.collectList(); // Flux → Mono<List<String>>
Flux<String> backToFlux = monoList.flatMapMany(Flux::fromIterable); // Mono<List<String>> → Flux<String>
backToFlux.subscribe(System.out::println);
結果
A
B
C
👉 一度 List
にまとめてから、再び Flux として流すときに便利
- 例:Fluxを一旦収集してソート/フィルター後に再度Fluxへ
Flux<List<String>> → Mono<List<String>> → Flux<String>
Flux<List<String>> fluxOfLists = Flux.just(
List.of("A","B"),
List.of("C"),
List.of("D","E")
);
Mono<List<String>> monoList = fluxOfLists
.flatMapIterable(list -> list) // フラット化
.collectList(); // Mono<List<String>>
Flux<String> processedFlux = monoList
.map(list -> list.stream().map(v -> v + "!").toList()) // 加工
.flatMapMany(Flux::fromIterable); // 再びFluxに戻す
processedFlux.subscribe(System.out::println);
結果
A!
B!
C!
D!
E!
👉 Flux<List> → フラット化 → 加工 → 再Flux化のパターン
Mono<List<String>> → Flux<String> → Mono<List<String>>
Mono<List<String>> monoList = Mono.just(List.of("A","B","C"));
Mono<List<String>> result = monoList
.flatMapMany(Flux::fromIterable) // 展開
.map(v -> v + "!")
.collectList(); // 再度まとめる
result.subscribe(System.out::println);
結果
[A!, B!, C!]
👉 Mono 内のリストを展開し、加工してから再度まとめる
Mono<List<String>> → Flux<List<String>> → Mono<List<String>>
Mono<List<String>> monoList = Mono.just(List.of("A","B","C"));
Mono<List<String>> result = monoList
.flatMapMany(list -> Flux.just(list)) // Flux<List<String>>
.reduce(new ArrayList<>(), (acc, subList) -> {
acc.addAll(subList);
return acc;
});
result.subscribe(System.out::println);
結果
[A, B, C]
👉 チャンクに分割することも可能だが、ここではそのままリストを1回経由して再結合
Flux<List<String>>
+ Mono<List<Integer>>
zipWith 結合
Flux<List<String>> fluxList = Flux.just(
List.of("A","B"),
List.of("C"),
List.of("D","E")
);
Mono<List<Integer>> monoInts = Mono.just(List.of(1,2,3,4,5));
// zipWith + repeat → Flux全体とマッチ
Flux<String> zippedFlux = fluxList
.zipWith(monoInts.repeat())
.map(tuple -> tuple.getT1() + "::" + tuple.getT2());
結果
[A, B]::[1, 2, 3, 4, 5]
[C]::[1, 2, 3, 4, 5]
[D, E]::[1, 2, 3, 4, 5]
さらにフラット化→マッチングしてから再Monoへまとめる例:
Mono<List<String>> combinedMono = fluxList
.zipWith(monoInts.repeat())
.flatMap(tuple -> Flux.fromIterable(tuple.getT1())
.zipWithIterable(tuple.getT2())
.map(pair -> pair.getT1() + "-" + pair.getT2())
)
.collectList(); // Mono<List<String>>
結果
最終結果: [A-1, B-2, C-1, D-1, E-2]
Mono を毎回新しく呼びたい場合
Flux.just("A","B","C")
.flatMap(v -> Mono.defer(() -> {
System.out.println("API call: " + v);
return Mono.just(v + "-result");
}))
.subscribe(System.out::println);
結果
API call: A
A-result
API call: B
B-result
API call: C
C-result
Flux<String>
+ Mono<String>
zipWith の動作の違い
Flux<String> flux = Flux.just("A","B","C");
Mono<String> mono = Mono.just("X");
flux.zipWith(mono) // 最初の要素のみマッチ
.subscribe(t -> System.out.println(t.getT1() + "-" + t.getT2()));
flux.zipWith(mono.repeat()) // repeat → Flux全体にマッチ
.subscribe(t -> System.out.println(t.getT1() + "-" + t.getT2()));
結果
A-X (monoは1回だけemit → 最初の要素のみマッチ)
A-X
B-X
C-X (repeat() すると値を再放出)
👉 Mono は1回しか emit しないので zip するとFluxの最初の要素しかマッチしない → Flux全体にマッチさせるには repeat() が必要
✅ これらの例が示す主要パターン
シナリオ | 主要演算 |
---|---|
Flux<List<T>> → Mono<List<T>>
|
reduce() または flatMapIterable + collectList()
|
Flux<T> → Mono<List<T>> → Flux<T>
|
collectList() + flatMapMany()
|
Mono<List<T>> → Flux<T> → Mono<List<T>>
|
flatMapMany() + collectList()
|
Mono<List<T>> → Flux<List<T>> → Mono<List<T>>
|
flatMapMany(list -> Flux.just(list)) + reduce() |
Flux + Mono zip |
zipWith(mono.repeat()) |
Monoを毎回新規実行 | Mono.defer() |