5
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

CompletableFuture手始め3 (非同期リクエストを発行してみる)

Posted at

CompletableFuture手始め2 (CompletableFutureを作ってみる)
の続き

非同期処理のリクエストを複数発行する

以下のようなStringのリストがあり、これら一つ一つを非同期処理の引数とする。

private static final List<String> argsList 
= Arrays.asList("test1", "test2", "test3", "test4");

Streamを使って、非同期処理を行う。

public static List<String> getDoubles() {
    List<CompletableFuture<String>> doubleFutures = argsList.stream()
            // ファクトリメソッドを使ってCompletableFutureを作成
            .map(arg -> CompletableFuture.supplyAsync(
                    () -> String.format("value: %f", doSomeLongComputation(arg))
            ))
            .collect(Collectors.toList());

    List<String> strs = doubleFutures.stream()
            // joinを使用して、CompletableFutureの結果を取得
            .map(CompletableFuture::join)
            .collect(Collectors.toList());
    return strs;
}

特筆すべき点は2点。
CompletableFuture手始め2で記載した、
CompletableFuture.supplyAsyncファクトリーメソッドを使い、
CompletableFutureのリストを作成している。

加えて、作成したCompletableFutureのリストから処理結果を取得する際に、
CompletableFuture.joinメソッドを使用している。

getメソッドとjoinメソッドの違いは下記ページの解説が詳しい。
completablefuture join vs get
端的に言うと、joinメソッドを使用する場合は、例外に対して明示的にtry-catch文を書く必要がなくなる。
(但し、例外が飛ばないわけではないので、ケアをする必要はある)

連続する非同期処理の記述

public static List<Double> getDoubleByTimes() {
    List<CompletableFuture<Double>> futures = argsList.stream() 
            .map(arg -> CompletableFuture.supplyAsync(
                    () -> doSomeLongComputation(arg)
            )) // Stream<CompletableFuture>を生成
            // 一つ目のCompletableFutureの結果を、2つ目のCompletableFutureへ渡す
            .map(future -> future.thenCompose(value ->
                    CompletableFuture.supplyAsync(
                            () -> timeLongComputation(value)))) 
            .collect(toList());
    return futures.stream()
            .map(CompletableFuture::join)
            .collect(toList());
}

処理の流れとしては以下の通り

List<String> // stream()
↓
Stream<CompletableFuture<Double>> // CompletableFuture.supplyAsync
↓
Stream<CompletableFuture<Double>> // CompletableFuture.thenCompose
↓
List<CompletableFuture<Double>> // toList()

thenComposeを使うことで、
2つの異なるCompletableFutureをカスケード的に使用することができる。

thenCompose(Function super T,? extends CompletionStage> fn)

このステージが正常に完了したときに、
このステージを指定された関数への引数に設定して実行される新しいCompletionStageを返します。
thenCompose@Oracle

最初のCompletableFuturethenComposeを呼び出し、
その結果を関数に渡す。
この関数は、最初のCompletableFutureが返す値を引数とし、
その引数を使用した二つ目のCompletableFuture内で計算した値を戻り値として返す。

notes ( thenApplyとthenComposeの違い )

類似の関数としてthenApplyがある。

thenApply(Function super T,? extends U> fn)
このステージが正常に完了したときに、このステージの結果を指定された関数への引数に設定して実行される新しいCompletionStageを返します
thenApply@Oracle

CompletableFuture | thenApply vs thenCompose

上のStackOverFlowの説明が詳しい。
特に以下の回答がとても参考になった。

thenApply() returned the nested futures as they were,
but thenCompose() flattened the nested CompletableFutures so that it is easier to chain more method calls to it.

上述のコード内でthenApplyを使うと、その戻り値が、
CompletableFuture<CompletableFuture<Double>>のようになり、
CompletableFutureがネストされるようになる。

List<String> // stream()
↓
Stream<CompletableFuture<Double>> // CompletableFuture.supplyAsync
↓
Stream<CompletableFuture<CompletableFuture<Double>>> // CompletableFuture.thenApply
↓
List<CompletableFuture<CompletableFuture<Double>>> // toList()

Java9のドキュメントでは、類似する概念として以下の例が挙げられている。
CompletableFuture.thenApply ⇄ Stream.map
CompletableFuture.thenCompose ⇄ Stream.flatMap

mapとflatMapについては、
Streamにおけるmapとflatmapの理解へ向けて(1)
で、少しまとめてみた。

2つの独立したCompletableFutureの結果を使用する場合

thenCombine(CompletionStage extends U> other, BiFunction super T,? super U,? extends V> fn)

このステージと指定された他のステージの両方が正常終了した際に実行される新しいCompletionStageを返します
(実行時には、指定された関数の引数として2つの結果が使用される)。

List<CompletableFuture<Double>> futures = argsList.stream()
        .map(arg -> CompletableFuture.supplyAsync(
                () -> doSomeLongComputation(arg)
        ))
        .map(future -> future.thenCombine(
                CompletableFuture.supplyAsync(
                        () -> doSomeLongComputation("test")),
                (resultFromFirstFuture, resultFromSecondsFuture) -> resultFromFirstFuture * resultFromSecondsFuture
        ))
        .collect(toList());

return futures.stream()
        .map(CompletableFuture::join)
        .collect(toList());

notes: thenCombine / thenCombineAsync

Asyncの接尾辞がつく場合、
2つ目の引数のBiFunctionは、スレッドプールに渡され、
異なるタスクとして非同期で実行されるようになる。

reference

Java8 In Action

5
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?