はじめに
この記事はExecutorフレームワークのありがたみを知るシリーズ第5弾、ExecutorCompletionServiceに関する記事です。
関連記事:
- Executors#newSingleThreadExecutor編
- Executors#newFixedThreadPool編
- Executors#newCachedTheadPool編
- Callable/Future編
参考:
第49回 マルチスレッド編 CompletionServiceインターフェース
Futureの問題点
Callable/Future編では、タスクの戻り値やタスクで投げられた例外を取得することができるFutureについてまとめました。
便利なFutureですが、実際使ってみると問題点があります。
例によって、例によって、以下の「受け取った数値になんらかの計算を施す」タスクについて考えます。
今回はFutureについての話なので、タスクはCallableとしています。
public class MyTask implements Callable<Integer> {
private int value;
public MyTask(int value) {
this.value = value;
}
@Override
public Integer call() throws Exception {
System.out.println("[start] value:" + value + " thread-id:" + Thread.currentThread().getId());
// 何か複雑な処理
System.out.println("[ end ] value:" + value + " thread-id:" + Thread.currentThread().getId());
return value + 10;
}
}
問題は、複数のタスクを実行しようとした時に現れます。
終わった順に処理できない
上記のタスクを複数回呼び出し、それぞれ結果を出力することを考えます。スレッド数は固定とします。
以下のように書けます。
public class App {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService executor = Executors.newFixedThreadPool(3);
List<Integer> valueList = Arrays.asList(1, 2, 3, 4, 5);
List<Future<Integer>> results = new ArrayList<>();
for (int value : valueList) {
results.add(executor.submit(new MyTask(value)));
}
for(Future<Integer> result: results){
System.out.println(result.get());
}
}
}
// コンソールの出力:
// [start] value:1 thread-id:20
// [start] value:3 thread-id:22
// [ end ] value:1 thread-id:20
// [ end ] value:3 thread-id:22
// [start] value:4 thread-id:22
// [ end ] value:4 thread-id:22
// [start] value:5 thread-id:20
// [start] value:2 thread-id:21
// 11
// [ end ] value:5 thread-id:20
// [ end ] value:2 thread-id:21
// 12
// 13
// 14
// 15
一見問題なさそうに見えます。
ただし、例えばvalue=1の計算がすごく時間のかかるものだったらどうでしょうか?
それ以降の処理が先に終わっていたとしても、遅い処理のせいで全体が止まってしまいます。
コンソールの出力をみると、1の次に3が終わっていますが、2の出力が終わるまで3は出力されていません。
せっかく並列処理をしているのに、最終の処理が直列となり遅い処理に足を引っ張られます。
好ましくありませんね。
リストにFutureを詰め込んで、頭から順に回しているからこういうことになるのですが、では「終わった順に取り出す」ことを実装することを想像してみてください。
一筋縄では行かなそうという事がわかると思います。
ExecutorCompletionServiceを使って解決する
そこで使えるのがExecutorCompletionServiceです。
ExecutorCompletionServiceとは
ExecutorServiceをコンストラクタの引数にとる、ExecutorServiceの上位存在とも言えるものです。
CompletionService<Integer> completionService = new ExecutorCompletionService<>(executor);
ExecutorCompletionServiceのtakeを使うと、終わった順に結果を取得する事ができます。
実行自体は内側のExecutorServiceが行いますが、その外側(上位存在)であるCompletionServiceが中のExecutorServiceを監視して、終わった順に返す、というのを実現してくれます。
終わった順に処理する
ExecutorCompletionServiceを使うと、メインメソッドは以下のように書けます。
public class App {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService executor = Executors.newFixedThreadPool(3);
CompletionService<Integer> completionService = new ExecutorCompletionService<>(executor);
List<Integer> valueList = Arrays.asList(1, 2, 3, 4, 5);
for (int value : valueList) {
completionService.submit(new MyTask(value));
}
for (int i = 0; i < valueList.size(); i++) {
Future<Integer> result = completionService.take();
System.out.println(result.get());
}
}
}
// コンソールの出力:
// [start] value:2 thread-id:21
// [start] value:1 thread-id:20
// [start] value:3 thread-id:22
// [ end ] value:3 thread-id:22
// [ end ] value:2 thread-id:21
// [ end ] value:1 thread-id:20
// [start] value:5 thread-id:21
// [ end ] value:5 thread-id:21
// [start] value:4 thread-id:20
// [ end ] value:4 thread-id:20
// 13
// 12
// 11
// 15
// 14
コンソールの出力をみると、確かに「終わった順」に出力されていますね。
実際「終わった順」を自力で実装しようと思うと最低でも十数行〜数百行が必要になると思いますが、CompletionServiceを使うことで元のシンプルな実装をほとんど変えずに実現する事ができます。
やはり我々を本当にやりたいこと、タスクに集中させてくれる存在という事がわかります。
おわりに
Executorフレームワークは先人たちの血の滲む苦労の上に誕生したものだと思います。
戻り値を扱いたくてFutureを作った、実際使ってみたら上記の問題があった、じゃあそれを解決する何かを作ろう、という歴史を感じます。
(実際、CompletionServiceは少し後に追加されたもののようです)
先人たちの血の滲む苦労
そりゃあ、スレッドプールやら「終わった順」やらを自力で作ってみようと思ったら大変なわけですよ