Javaで複雑なスレッド処理
Javaには当初からThreadクラスがあるが、インスタンス生成が重くて微妙だった。
JDK5からはExecutorが導入されて、WorkerThread的なことはやりやすくなったし、Lock系のクラスも増えて便利になった。また、Futureの導入により処理結果を受け取るのも楽になった。
とはいえ、Future.get()を呼ぶと呼び出しThreadがブロックするからそれもまた別Threadでやって、というようになんか本質的じゃない処理を書かないといけなかったのも確か。
Java8からはCompletableFutureが導入され、より複雑なThread処理を行うことができるようになったようなので、調査をする。
前提知識
同じくJava8で導入されたFunction, Consumer, Supplierを理解していないとよく分からないと思う。以下に記事書いたので参考にしていただければ。
Java8のFunction, Consumer, Supplier, Predicateの調査
処理の結果ある値を返して、それを使って別の処理をする
ある値を返すのはSupplierで、値を受け取って処理をするのはConsumerなので、それら二つを組み合わせる。
CompletableFuture.supplyAsync(Supplier)で、非同期にSupplierを処理しつつ、CompletableFutureのインスタンスを返す。
CompletableFuture.thenAcceptAsync(Consumer)で、CompletableFutureインスタンスの処理が終了したら戻り値を渡してConsumerの処理を実行する。
public void SupplyAndConsume() {
Supplier<Integer> initValueSupplier = () -> 100;
Consumer<Integer> valueConsumer = value -> System.out.println(value);
CompletableFuture<Void> future =
CompletableFuture.supplyAsync(initValueSupplier)
.thenAcceptAsync(valueConsumer);
}
処理結果
100
処理の結果ある値を返して、それを変換して、それを使って別の処理をする
ある値を返すのはSupplierで、変換はFunctionで、それを受け取って処理をするのはConsumerで実施する。
基本的には前のパターンと一緒だが、CompletableFuture.thenApplyAsync(Function)で、CompletableFutureの結果の値を渡しながらFunctionの処理を実行する。
public void SupplyAndExecuteAndConsume() {
Supplier<Integer> initValueSupplier = () -> 100;
Function<Integer, Integer> multiply = value -> value * 2;
Consumer<Integer> valueConsumer = value -> System.out.println(value);
CompletableFuture<Void> future =
CompletableFuture.supplyAsync(initValueSupplier)
.thenApplyAsync(multiply)
.thenAcceptAsync(valueConsumer);
}
処理結果
200
ある処理を複数スレッドで実施して、最初に得られた結果を使って別の処理をする
Supplierで処理した結果を返して、Consumerで受け取った値を使って処理をする。
CompletableFuture.acceptEitherAsync(CompletableFuture, Consumer)にて、先に結果が出たものを使ってConsumer処理をさせるということができる。
public void RaceAndConsume() {
Supplier<Integer> initValueSupplier = () -> 100;
Supplier<Integer> anotherValueSupplier = () -> 200;
Consumer<Integer> valueConsumer = value -> System.out.println(value);
CompletableFuture<Integer> future1 =
CompletableFuture.supplyAsync(initValueSupplier);
CompletableFuture<Integer> future2 =
CompletableFuture.supplyAsync(anotherValueSupplier);
future1.acceptEitherAsync(future2, valueConsumer);
}
100 or 200
ある処理を複数スレッドで実施して、最初に得られた結果を使って別の処理をし、その結果を使って別の処理をする
public void RaceAndConsume() {
Supplier<Integer> initValueSupplier = () -> 100;
Supplier<Integer> anotherValueSupplier = () -> 200;
Function<Integer, Integer> multiply = value -> value * 2;
Consumer<Integer> valueConsumer = value -> System.out.println(value);
CompletableFuture<Integer> future1 =
CompletableFuture.supplyAsync(initValueSupplier);
CompletableFuture<Integer> future2 =
CompletableFuture.supplyAsync(anotherValueSupplier);
future1.applyToEitherAsync(future2, multiply)
.thenAcceptAsync(valueConsumer);
}
200 or 400
synchronizedについて完全理解したいなら
synchronizedの解説と典型的誤り例
を書いたので、こちらもどうぞ。経験的に、7割くらいの人がなにか間違っています。