0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

CompletableFutureを使って非同期処理をやってみた

Last updated at Posted at 2023-04-10

今日も備忘録です。

このドキュメントについて

javaではthreadクラスなどを用いると、以下2つの難点がありました。

  1. スレッド:スレッド内で行った処理をその後で引き継げない
  2. future:getメソッドの呼び出しがスレッドをブロックしてしまう

今回は上記を解消できる、java8から登場したCompletableFutureクラスについて簡単に機能をまとめてみます。

実行環境

・Java17
・springboot 3.0.2

実装

①:4つのメソッドを利用して、実践!

        var use1 = CompletableFuture.supplyAsync(
                () -> {
                        try{
                            Thread.sleep(1000);
                        } catch(InterruptedException e) {
                            throw new RuntimeException("割り込み", e);
                        }
                        log.info("use1終わり");
                        return "use1";
                });

        var use2 = use1.thenRunAsync(new commonTask(), threadPoolTaskExecutor);

        var use3 = use1.thenApplyAsync(
                value -> {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        throw new RuntimeException("割り込み", e);
                    }
                    log.info("処理終わり→" + Thread.currentThread().getId() + Thread.currentThread().getName());
                    return value + "+use3";
                }, threadPoolTaskExecutor);

        use2.thenCombine(use3, (a,b) -> a + b)
                .thenAccept(log::info);

        log.info("メインスレッドは終わりです");

上記で利用しているthreadPoolTaskExecutorの定義については以下。

    @Bean(name = "Thread2")
    ThreadPoolTaskExecutor configThreadPool2() {
        var threadPool = new ThreadPoolTaskExecutor();
        threadPool.setCorePoolSize(2);
        threadPool.setMaxPoolSize(5);
        threadPool.setKeepAliveSeconds(0);
        threadPool.setQueueCapacity(500);
        threadPool.setDaemon(true);
        threadPool.setThreadNamePrefix("Thread2-");
        threadPool.initialize();
        return threadPool;
    }

説明:
CompletableFutureは「1段終われば、次の段、その次の段が終われば、また次の段」というふうに階段のような処理ができるイメージを僕はしてます。
また、上記実装は非同期で行っているので、「メインスレッドは終わりです」が1番最初にログに記載されます。

        var use1 = CompletableFuture.supplyAsync(
                () -> {
                        try{
                            Thread.sleep(1000);
                        } catch(InterruptedException e) {
                            throw new RuntimeException("割り込み", e);
                        }
                        log.info("use1終わり");
                        return "use1";
                });

use1:
use1にて、 CompletableFuture.supplyAsync()を利用して、非同期で処理を行いつつ、CompletableFutureのインスタンスを返却します。


var use2 = use1.thenRunAsync(new commonTask(), threadPoolTaskExecutor);

use2:
use2にて、CompletableFuture.thenRunAsync()では、use1の処理が終わらないと実行されず、非同期で処理されます。
thenRunAsync()は第一引数に、Runnableインターフェースが入り、第二引数にはどのExecutorで実施したいかを設定できます。(設定しない場合はデフォルトが利用されるようです。)

        var use3 = use1.thenApplyAsync(
                value -> {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        throw new RuntimeException("割り込み", e);
                    }
                    log.info("処理終わり→" + Thread.currentThread().getId() + Thread.currentThread().getName());
                    return value + "+use3";
                }, threadPoolTaskExecutor);

use3:
use3も同様、CompletableFuture.thenApplyAsync()では、use1の処理が終わらないと実行されず、非同期で処理されます。
use2とは違い、第一引数にFunction型を受け取るので、use1の返却値を利用することができます。
第二引数にはどのExecutorで実施したいかを設定できます。(設定しない場合はデフォルトが利用されるようです。)

        use2.thenCombine(use3, (a,b) -> a + b)
               .thenAccept(log::info);

最後:
CompletableFuture.thenCombine()では、use2とuse3の処理がどちらも完了した時に、それぞれの返却値を受け取って処理を行うことができ、処理した結果を返却します。
なので、第二引数はBiFunction型を受け取ります。

thenAccept()は、引数にConsumer型を受け取るので、上記で受け取った値をlogに残すといった処理になります。

②:その他メソッドについて

簡単にその他のメソッドについても記載しておきます。

acceptEitherAsync()

var use7 = use5.acceptEitherAsync(use6, log::info,threadPoolTaskExecutor);

use5もしくはuse6どちらか先に処理が終わった方で処理を行う。


runAfterBothAsync()

var use9 = use7.runAfterBothAsync(use8, Runnable, threadPoolTaskExecutor);

use7とuse8どちらも処理が終わった後に、Runnableインターフェースのrunメソッドが非同期で実行される。

エラーのハンドリングに使えるメソッド

・handleAsync()
・whenCompleteAsync()
・exceptionally()
など


他にもいろんな用途のメソッドが用意されているので、気になる場合は下記参照。

まとめ

非同期処理の結果を再利用したい場合は、CompletableFutureを使うのが良さそうです。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?