0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Reactor実戦パターン整理

Posted at

1) Flux<List<String>>Mono<List> (フラット化)

Flux<List<String>> flux = Flux.just(
    List.of("A","B"),
    List.of("C"),
    List.of("D","E")
);

flux.reduce(new ArrayList<String>(), (acc, list) -> {
    acc.addAll(list);
    return acc;
}).subscribe(result -> System.out.println("reduce: " + result));
  • reduce() を使い、複数の List<String>1つのリストに結合(フラット化)
  • [[A,B], [C], [D,E]] → [A, B, C, D, E]

結果

reduce: [A, B, C, D, E]

2) Flux<List<List<String>>>Flux (2段階フラット化)

Flux<List<List<String>>> flux2 = Flux.just(
    List.of(List.of("A","B"), List.of("C")),
    List.of(List.of("D","E"), List.of("F"))
);

flux2.flatMapIterable(innerLists -> innerLists)
     .flatMapIterable(list -> list)
     .subscribe(System.out::println);
  • 2重リストを2段階で展開して String までフラット化
  • [[A,B],[C]] → [A,B,C]

結果

A
B
C
D
E
F

Flux → Mono → Flux へ行って戻る例

Flux<String> flux = Flux.just("A","B","C");

Mono<List<String>> monoList = flux.collectList();         // Flux → Mono<List<String>>
Flux<String> backToFlux = monoList.flatMapMany(Flux::fromIterable); // Mono<List<String>> → Flux<String>

backToFlux.subscribe(System.out::println);

結果

A
B
C

👉 一度 List にまとめてから、再び Flux として流すときに便利

  • 例:Fluxを一旦収集してソート/フィルター後に再度Fluxへ

Flux<List<String>> → Mono<List<String>> → Flux<String>

Flux<List<String>> fluxOfLists = Flux.just(
    List.of("A","B"),
    List.of("C"),
    List.of("D","E")
);

Mono<List<String>> monoList = fluxOfLists
    .flatMapIterable(list -> list)   // フラット化
    .collectList();                  // Mono<List<String>>

Flux<String> processedFlux = monoList
    .map(list -> list.stream().map(v -> v + "!").toList()) // 加工
    .flatMapMany(Flux::fromIterable); // 再びFluxに戻す

processedFlux.subscribe(System.out::println);

結果

A!
B!
C!
D!
E!

👉 Flux<List> → フラット化 → 加工 → 再Flux化のパターン


Mono<List<String>> → Flux<String> → Mono<List<String>>

Mono<List<String>> monoList = Mono.just(List.of("A","B","C"));

Mono<List<String>> result = monoList
    .flatMapMany(Flux::fromIterable) // 展開
    .map(v -> v + "!")
    .collectList();                   // 再度まとめる

result.subscribe(System.out::println);

結果

[A!, B!, C!]

👉 Mono 内のリストを展開し、加工してから再度まとめる


Mono<List<String>> → Flux<List<String>> → Mono<List<String>>

Mono<List<String>> monoList = Mono.just(List.of("A","B","C"));

Mono<List<String>> result = monoList
    .flatMapMany(list -> Flux.just(list)) // Flux<List<String>>
    .reduce(new ArrayList<>(), (acc, subList) -> { 
        acc.addAll(subList);
        return acc;
    });

result.subscribe(System.out::println);

結果

[A, B, C]

👉 チャンクに分割することも可能だが、ここではそのままリストを1回経由して再結合


Flux<List<String>> + Mono<List<Integer>> zipWith 結合

Flux<List<String>> fluxList = Flux.just(
    List.of("A","B"),
    List.of("C"),
    List.of("D","E")
);

Mono<List<Integer>> monoInts = Mono.just(List.of(1,2,3,4,5));

// zipWith + repeat → Flux全体とマッチ
Flux<String> zippedFlux = fluxList
    .zipWith(monoInts.repeat()) 
    .map(tuple -> tuple.getT1() + "::" + tuple.getT2());

結果

[A, B]::[1, 2, 3, 4, 5]
[C]::[1, 2, 3, 4, 5]
[D, E]::[1, 2, 3, 4, 5]

さらにフラット化→マッチングしてから再Monoへまとめる例:

Mono<List<String>> combinedMono = fluxList
    .zipWith(monoInts.repeat())
    .flatMap(tuple -> Flux.fromIterable(tuple.getT1())
        .zipWithIterable(tuple.getT2())
        .map(pair -> pair.getT1() + "-" + pair.getT2())
    )
    .collectList(); // Mono<List<String>>

結果

最終結果: [A-1, B-2, C-1, D-1, E-2]

Mono を毎回新しく呼びたい場合

Flux.just("A","B","C")
    .flatMap(v -> Mono.defer(() -> {
        System.out.println("API call: " + v);
        return Mono.just(v + "-result");
    }))
    .subscribe(System.out::println);

結果

API call: A
A-result
API call: B
B-result
API call: C
C-result

Flux<String> + Mono<String> zipWith の動作の違い

Flux<String> flux = Flux.just("A","B","C");
Mono<String> mono = Mono.just("X");

flux.zipWith(mono)          // 最初の要素のみマッチ
    .subscribe(t -> System.out.println(t.getT1() + "-" + t.getT2()));

flux.zipWith(mono.repeat()) // repeat → Flux全体にマッチ
    .subscribe(t -> System.out.println(t.getT1() + "-" + t.getT2()));

結果

A-X        (monoは1回だけemit → 最初の要素のみマッチ)
A-X
B-X
C-X        (repeat() すると値を再放出)

👉 Mono は1回しか emit しないので zip するとFluxの最初の要素しかマッチしない → Flux全体にマッチさせるには repeat() が必要


✅ これらの例が示す主要パターン

シナリオ 主要演算
Flux<List<T>>Mono<List<T>> reduce() または flatMapIterable + collectList()
Flux<T>Mono<List<T>>Flux<T> collectList() + flatMapMany()
Mono<List<T>>Flux<T>Mono<List<T>> flatMapMany() + collectList()
Mono<List<T>>Flux<List<T>>Mono<List<T>> flatMapMany(list -> Flux.just(list)) + reduce()
Flux + Mono zip zipWith(mono.repeat())
Monoを毎回新規実行 Mono.defer()
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?