reactorを使ってblockingな処理をマルチスレッドで行って結果をまとめるような処理をやりたい。具体的な実装方法はマニュアルに記載がある。
https://projectreactor.io/docs/core/release/reference/#faq.wrap-blocking
ただし、実際にこの方法で実装すると、Schedulers.elastic()
がひとおもいにthreadを生成してしまうために、リソースを食いつぶしてしまう可能性がある。
public static void main(String[] args) {
final var webClient = WebClient.builder()
.baseUrl("http://example.com")
.build();
Flux.fromStream(IntStream.range(1, 100).boxed())
.flatMap(i -> Mono.fromCallable(() -> webClient.get()
.retrieve()
.bodyToMono(String.class)
.block()) // 説明のためあえてblockしてる
.subscribeOn(Schedulers.elastic()))
.blockLast();
}
実行ログ
・
・
02:24:57.014 [elastic-77] DEBUG o.s.w.r.f.client.ExchangeFunctions - [11ca9c18] HTTP GET http://example.com
02:24:57.017 [elastic-15] DEBUG o.s.w.r.f.client.ExchangeFunctions - [3ee270fc] HTTP GET http://example.com
02:24:57.017 [elastic-61] DEBUG o.s.w.r.f.client.ExchangeFunctions - [272cc048] HTTP GET http://example.com
02:24:57.015 [elastic-45] DEBUG o.s.w.r.f.client.ExchangeFunctions - [12f4ca28] HTTP GET http://example.com
02:24:57.014 [elastic-17] DEBUG o.s.w.r.f.client.ExchangeFunctions - [3d44ed66] HTTP GET http://example.com
02:24:57.017 [elastic-57] DEBUG o.s.w.r.f.client.ExchangeFunctions - [6b5899a3] HTTP GET http://example.com
02:24:57.017 [elastic-92] DEBUG o.s.w.r.f.client.ExchangeFunctions - [7ec595f3] HTTP GET http://example.com
02:24:57.015 [elastic-94] DEBUG o.s.w.r.f.client.ExchangeFunctions - [70f26d87] HTTP GET http://example.com
・
・
public static void main(String[] args) {
final var webClient = WebClient.builder()
.baseUrl("http://example.com")
.build();
Flux.fromStream(IntStream.range(1, 100).boxed())
.flatMap(i -> Mono.fromCallable(() -> webClient.get()
.retrieve()
.bodyToMono(String.class)
.block()) // 説明のためあえてblockしてる
.subscribeOn(Schedulers.elastic())
, 10) // concurrencyに10を指定
.blockLast();
}
実行ログ
02:28:06.020 [elastic-4] DEBUG o.s.w.r.f.client.ExchangeFunctions - [1b6144e3] HTTP GET http://example.com
02:28:06.020 [elastic-9] DEBUG o.s.w.r.f.client.ExchangeFunctions - [64d61eb3] HTTP GET http://example.com
02:28:06.020 [elastic-5] DEBUG o.s.w.r.f.client.ExchangeFunctions - [1b00ce18] HTTP GET http://example.com
02:28:06.020 [elastic-2] DEBUG o.s.w.r.f.client.ExchangeFunctions - [590d0628] HTTP GET http://example.com
02:28:06.021 [elastic-3] DEBUG o.s.w.r.f.client.ExchangeFunctions - [504a226f] HTTP GET http://example.com
02:28:06.021 [elastic-6] DEBUG o.s.w.r.f.client.ExchangeFunctions - [3ace12f2] HTTP GET http://example.com
02:28:06.021 [elastic-10] DEBUG o.s.w.r.f.client.ExchangeFunctions - [4135ca0a] HTTP GET http://example.com
02:28:06.021 [elastic-8] DEBUG o.s.w.r.f.client.ExchangeFunctions - [badf622] HTTP GET http://example.com
02:28:06.020 [elastic-7] DEBUG o.s.w.r.f.client.ExchangeFunctions - [2dfed701] HTTP GET http://example.com
02:28:06.021 [elastic-11] DEBUG o.s.w.r.f.client.ExchangeFunctions - [753526d8] HTTP GET http://example.com
02:28:06.673 [reactor-http-nio-8] DEBUG o.s.w.r.f.client.ExchangeFunctions - [2dfed701] Response 200 OK
02:28:06.673 [reactor-http-nio-9] DEBUG o.s.w.r.f.client.ExchangeFunctions - [504a226f] Response 200 OK
02:28:06.687 [reactor-http-nio-7] DEBUG o.s.w.r.f.client.ExchangeFunctions - [590d0628] Response 200 OK
02:28:06.757 [reactor-http-nio-6] DEBUG o.s.w.r.f.client.ExchangeFunctions - [753526d8] Response 200 OK
・
・
elastic schedulerのthreadの数が10個に制限されている。
実際には、flatMap()
を呼ぶ場合にでも、内部的にconcurrencyが指定されている。ただ、その値が大きいために制限がないようなログに見えていた。
また、subscribeOn
にはExecutorも指定可能なので、細かいアルゴリズムの違いはあるがFixedThreadPoolやWorkStealingPool(ForkJoinPool)を使って同様の処理をすることが可能。.subscribeOn(Schedulers.elastic())
の代わりに.subscribeOn(Schedulers.fromExecutor(executorService)))
を指定すれば良い。
また、application停止時にこれらのworker threadを確実にshutdownしたい場合にはExecutorServiceを利用することが有効だと思われる。