0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

reactorでblockingな処理をマルチスレッドで実行する

Last updated at Posted at 2019-07-26

reactorを使ってblockingな処理をマルチスレッドで行って結果をまとめるような処理をやりたい。具体的な実装方法はマニュアルに記載がある。
https://projectreactor.io/docs/core/release/reference/#faq.wrap-blocking
ただし、実際にこの方法で実装すると、Schedulers.elastic()がひとおもいにthreadを生成してしまうために、リソースを食いつぶしてしまう可能性がある。

    public static void main(String[] args) {
        final var webClient = WebClient.builder()
                                       .baseUrl("http://example.com")
                                       .build();
        Flux.fromStream(IntStream.range(1, 100).boxed())
            .flatMap(i -> Mono.fromCallable(() -> webClient.get()
                                                           .retrieve()
                                                           .bodyToMono(String.class)
                                                           .block()) // 説明のためあえてblockしてる
                              .subscribeOn(Schedulers.elastic()))
            .blockLast();
    }

実行ログ

・
・
02:24:57.014 [elastic-77] DEBUG o.s.w.r.f.client.ExchangeFunctions - [11ca9c18] HTTP GET http://example.com
02:24:57.017 [elastic-15] DEBUG o.s.w.r.f.client.ExchangeFunctions - [3ee270fc] HTTP GET http://example.com
02:24:57.017 [elastic-61] DEBUG o.s.w.r.f.client.ExchangeFunctions - [272cc048] HTTP GET http://example.com
02:24:57.015 [elastic-45] DEBUG o.s.w.r.f.client.ExchangeFunctions - [12f4ca28] HTTP GET http://example.com
02:24:57.014 [elastic-17] DEBUG o.s.w.r.f.client.ExchangeFunctions - [3d44ed66] HTTP GET http://example.com
02:24:57.017 [elastic-57] DEBUG o.s.w.r.f.client.ExchangeFunctions - [6b5899a3] HTTP GET http://example.com
02:24:57.017 [elastic-92] DEBUG o.s.w.r.f.client.ExchangeFunctions - [7ec595f3] HTTP GET http://example.com
02:24:57.015 [elastic-94] DEBUG o.s.w.r.f.client.ExchangeFunctions - [70f26d87] HTTP GET http://example.com
・
・
    public static void main(String[] args) {
        final var webClient = WebClient.builder()
                                       .baseUrl("http://example.com")
                                       .build();

        Flux.fromStream(IntStream.range(1, 100).boxed())
            .flatMap(i -> Mono.fromCallable(() -> webClient.get()
                                                           .retrieve()
                                                           .bodyToMono(String.class)
                                                           .block()) // 説明のためあえてblockしてる
                              .subscribeOn(Schedulers.elastic())
                    , 10) // concurrencyに10を指定
            .blockLast();
    }

実行ログ

02:28:06.020 [elastic-4] DEBUG o.s.w.r.f.client.ExchangeFunctions - [1b6144e3] HTTP GET http://example.com
02:28:06.020 [elastic-9] DEBUG o.s.w.r.f.client.ExchangeFunctions - [64d61eb3] HTTP GET http://example.com
02:28:06.020 [elastic-5] DEBUG o.s.w.r.f.client.ExchangeFunctions - [1b00ce18] HTTP GET http://example.com
02:28:06.020 [elastic-2] DEBUG o.s.w.r.f.client.ExchangeFunctions - [590d0628] HTTP GET http://example.com
02:28:06.021 [elastic-3] DEBUG o.s.w.r.f.client.ExchangeFunctions - [504a226f] HTTP GET http://example.com
02:28:06.021 [elastic-6] DEBUG o.s.w.r.f.client.ExchangeFunctions - [3ace12f2] HTTP GET http://example.com
02:28:06.021 [elastic-10] DEBUG o.s.w.r.f.client.ExchangeFunctions - [4135ca0a] HTTP GET http://example.com
02:28:06.021 [elastic-8] DEBUG o.s.w.r.f.client.ExchangeFunctions - [badf622] HTTP GET http://example.com
02:28:06.020 [elastic-7] DEBUG o.s.w.r.f.client.ExchangeFunctions - [2dfed701] HTTP GET http://example.com
02:28:06.021 [elastic-11] DEBUG o.s.w.r.f.client.ExchangeFunctions - [753526d8] HTTP GET http://example.com
02:28:06.673 [reactor-http-nio-8] DEBUG o.s.w.r.f.client.ExchangeFunctions - [2dfed701] Response 200 OK
02:28:06.673 [reactor-http-nio-9] DEBUG o.s.w.r.f.client.ExchangeFunctions - [504a226f] Response 200 OK
02:28:06.687 [reactor-http-nio-7] DEBUG o.s.w.r.f.client.ExchangeFunctions - [590d0628] Response 200 OK
02:28:06.757 [reactor-http-nio-6] DEBUG o.s.w.r.f.client.ExchangeFunctions - [753526d8] Response 200 OK
・
・

elastic schedulerのthreadの数が10個に制限されている。

実際には、flatMap()を呼ぶ場合にでも、内部的にconcurrencyが指定されている。ただ、その値が大きいために制限がないようなログに見えていた。

また、subscribeOnにはExecutorも指定可能なので、細かいアルゴリズムの違いはあるがFixedThreadPoolやWorkStealingPool(ForkJoinPool)を使って同様の処理をすることが可能。.subscribeOn(Schedulers.elastic())の代わりに.subscribeOn(Schedulers.fromExecutor(executorService)))を指定すれば良い。
また、application停止時にこれらのworker threadを確実にshutdownしたい場合にはExecutorServiceを利用することが有効だと思われる。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?