CompletableFuture手始め2 (CompletableFutureを作ってみる)
の続き
非同期処理のリクエストを複数発行する
以下のようなString
のリストがあり、これら一つ一つを非同期処理の引数とする。
private static final List<String> argsList
= Arrays.asList("test1", "test2", "test3", "test4");
Stream
を使って、非同期処理を行う。
public static List<String> getDoubles() {
List<CompletableFuture<String>> doubleFutures = argsList.stream()
// ファクトリメソッドを使ってCompletableFutureを作成
.map(arg -> CompletableFuture.supplyAsync(
() -> String.format("value: %f", doSomeLongComputation(arg))
))
.collect(Collectors.toList());
List<String> strs = doubleFutures.stream()
// joinを使用して、CompletableFutureの結果を取得
.map(CompletableFuture::join)
.collect(Collectors.toList());
return strs;
}
特筆すべき点は2点。
CompletableFuture手始め2で記載した、
CompletableFuture.supplyAsync
ファクトリーメソッドを使い、
CompletableFuture
のリストを作成している。
加えて、作成したCompletableFuture
のリストから処理結果を取得する際に、
CompletableFuture.join
メソッドを使用している。
getメソッドとjoinメソッドの違いは下記ページの解説が詳しい。
completablefuture join vs get
端的に言うと、joinメソッドを使用する場合は、例外に対して明示的にtry-catch文を書く必要がなくなる。
(但し、例外が飛ばないわけではないので、ケアをする必要はある)
連続する非同期処理の記述
public static List<Double> getDoubleByTimes() {
List<CompletableFuture<Double>> futures = argsList.stream()
.map(arg -> CompletableFuture.supplyAsync(
() -> doSomeLongComputation(arg)
)) // Stream<CompletableFuture>を生成
// 一つ目のCompletableFutureの結果を、2つ目のCompletableFutureへ渡す
.map(future -> future.thenCompose(value ->
CompletableFuture.supplyAsync(
() -> timeLongComputation(value))))
.collect(toList());
return futures.stream()
.map(CompletableFuture::join)
.collect(toList());
}
処理の流れとしては以下の通り
List<String> // stream()
↓
Stream<CompletableFuture<Double>> // CompletableFuture.supplyAsync
↓
Stream<CompletableFuture<Double>> // CompletableFuture.thenCompose
↓
List<CompletableFuture<Double>> // toList()
thenCompose
を使うことで、
2つの異なるCompletableFutureをカスケード的に使用することができる。
thenCompose(Function super T,? extends CompletionStage> fn)
このステージが正常に完了したときに、
このステージを指定された関数への引数に設定して実行される新しいCompletionStageを返します。
thenCompose@Oracle
最初のCompletableFuture
のthenCompose
を呼び出し、
その結果を関数に渡す。
この関数は、最初のCompletableFuture
が返す値を引数とし、
その引数を使用した二つ目のCompletableFuture
内で計算した値を戻り値として返す。
notes ( thenApplyとthenComposeの違い )
類似の関数としてthenApply
がある。
thenApply(Function super T,? extends U> fn)
このステージが正常に完了したときに、このステージの結果を指定された関数への引数に設定して実行される新しいCompletionStageを返します
thenApply@Oracle
CompletableFuture | thenApply vs thenCompose
上のStackOverFlowの説明が詳しい。
特に以下の回答がとても参考になった。
thenApply() returned the nested futures as they were,
but thenCompose() flattened the nested CompletableFutures so that it is easier to chain more method calls to it.
上述のコード内でthenApply
を使うと、その戻り値が、
CompletableFuture<CompletableFuture<Double>>
のようになり、
CompletableFutureがネストされるようになる。
List<String> // stream()
↓
Stream<CompletableFuture<Double>> // CompletableFuture.supplyAsync
↓
Stream<CompletableFuture<CompletableFuture<Double>>> // CompletableFuture.thenApply
↓
List<CompletableFuture<CompletableFuture<Double>>> // toList()
Java9のドキュメントでは、類似する概念として以下の例が挙げられている。
CompletableFuture.thenApply ⇄ Stream.map
CompletableFuture.thenCompose ⇄ Stream.flatMap
mapとflatMapについては、
Streamにおけるmapとflatmapの理解へ向けて(1)
で、少しまとめてみた。
2つの独立したCompletableFutureの結果を使用する場合
thenCombine(CompletionStage extends U> other, BiFunction super T,? super U,? extends V> fn)
このステージと指定された他のステージの両方が正常終了した際に実行される新しいCompletionStageを返します
(実行時には、指定された関数の引数として2つの結果が使用される)。
List<CompletableFuture<Double>> futures = argsList.stream()
.map(arg -> CompletableFuture.supplyAsync(
() -> doSomeLongComputation(arg)
))
.map(future -> future.thenCombine(
CompletableFuture.supplyAsync(
() -> doSomeLongComputation("test")),
(resultFromFirstFuture, resultFromSecondsFuture) -> resultFromFirstFuture * resultFromSecondsFuture
))
.collect(toList());
return futures.stream()
.map(CompletableFuture::join)
.collect(toList());
notes: thenCombine / thenCombineAsync
Asyncの接尾辞がつく場合、
2つ目の引数のBiFunction
は、スレッドプールに渡され、
異なるタスクとして非同期で実行されるようになる。