はじめに
この記事はExecutorフレームワークのありがたみを知るシリーズ第4弾、Callable/Futureに関する記事です。
関連記事:
参考:
- 3.1 スレッドの仕組みと操作(マルチスレッド、並行処理、並列処理、非同期呼び出し、スレッドセーフ、同期化、デッドロック、ウェイトセット、スレッドローカルなど)~Java Advanced編
- 3.2 並行処理ユーティリティとExecutorフレームワーク(スレッドプール、ExecutorService、Callable、Futureなど)~Java Advanced編
- ExecutorServiceを使って、Javaでマルチスレッド処理
自作スレッドの問題点
例によって、以下の「受け取った数値になんらかの計算を施す」タスクについて考えます。
public class MyTask implements Runnable{
private int value;
public MyTask(int value) {
this.value = value;
}
@Override
public void run() {
System.out.println("[start] value:" + value + " thread-id:" + Thread.currentThread().getId());
// 何か複雑な処理
System.out.println("[ end ] value:" + value+ " thread-id:" + Thread.currentThread().getId());
}
}
結果の取得が難しい
タスクでの計算結果をメイン側で受け取りたいとします。
Runnable#runは戻り値なし(void)、例外宣言もできません(throwsが宣言できない)。
基本的には、メイン側は「呼んだっきり」になってしまいます。
でも、裏側で計算させておいて、計算でき次第その結果を使いたいのです。さて、どうしましょう。
解決案の一つは、スレッド間の「共有変数」を作ることです。メインスレッドで変数を作成し、別スレッドに渡します。別スレッド内では結果をその変数に書き込ませます。メインスレッドでは別スレッドの終了を待ち、終了したら変数の値を読み取ります。
以下のように書けます。
// タスク定義
// 結果を格納する変数を受け取り、そこに書き込むように修正
public class MyTask implements Runnable{
private int value;
private AtomicInteger result; // 結果を格納する変数
public MyTask(int value, AtomicInteger result) {
this.value = value;
this.result = result;
}
@Override
public void run() {
System.out.println("[start] value:" + value + " thread-id:" + Thread.currentThread().getId());
// 何か複雑な処理
result.set(value+1);
System.out.println("[ end ] value:" + value+ " thread-id:" + Thread.currentThread().getId());
}
}
// メイン
public static void main(String[] args) {
AtomicInteger result = new AtomicInteger(); // 結果を格納する変数
Thread myThread = new Thread(new MyTask(10, result));
myThread.start();
try {
myThread.join(); // 処理完了を待つ
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("result: " + result.get());
}
複数のスレッドから参照される変数なので、「当たり前に」スレッドセーフな変数とする必要があります。
(これを当たり前にやるのはかなり難しいと考えています)
また、やはり複雑度が上がっていますね。やっていることは「裏で計算した結果を取得したい」ですが、ぱっと見でそのように読み取れるでしょうか?
やはり「難しい」のです。
失敗(例外発生)したことに気づけない
上述の通り、メイン側では「呼んだっきり」になってしまう性質上、タスクに失敗した、つまり例外が発生した場合の処理が非常に書きにくいです。
スレッドは、戻り値と同じように例外も呼び出し元に返すことができません。
複数の失敗パターンがあり、それぞれをタスク側で例外として投げるように実装したとしましょう。
以下のようなイメージです。
@Override
public void run() {
System.out.println("[start] value:" + value + " thread-id:" + Thread.currentThread().getId());
// 何か複雑な処理
process();
System.out.println("[ end ] value:" + value + " thread-id:" + Thread.currentThread().getId());
}
private void process() {
if (value == 1) {
throw new RuntimeException("なんらかのエラーA");
} else if (value <= 3) {
throw new RuntimeException("なんらかのエラーB");
} else {
throw new RuntimeException("なんらかのエラーC");
}
}
問題はメインスレッドがこの例外を素直に受け取ることができないということです。
解決案の一つは先ほどの結果受け取りと同様、変数として受け渡すことです。
以下のように書けます。
public class MyTask implements Runnable {
private RuntimeException exception;
private void process() {
if (value == 1) {
this.exception = new RuntimeException("なんらかのエラーA");
} else if (value <= 3) {
this.exception = new RuntimeException("なんらかのエラーB");
} else {
this.exception = new RuntimeException("なんらかのエラーC");
}
}
public RuntimeException getException() {
return this.exception;
}
}
public class App {
public static void main(String[] args) throws InterruptedException {
MyTask task = new MyTask(1);
Thread thread = new Thread(task);
thread.start();
thread.join();
if (task.getException() != null) {
task.getException().printStackTrace();
}
}
}
// コンソールの出力:
// [start] value:1 thread-id:20
// [ end ] value:1 thread-id:20
// java.lang.RuntimeException: なんらかのエラーA
// at thread.thread.MyTask.process(MyTask.java:21)
// at thread.thread.MyTask.run(MyTask.java:15)
// at java.base/java.lang.Thread.run(Thread.java:1583)
例外を投げずにインスタンスフィールドに保持するなんて、普通はやらないことですね。「普通やらないこと」は可読性が低いです。つまりわかりにくい、難しいです。
また、メインスレッド例外を取りに行かないといけない、というのも厄介です。メインスレッドがタスクの中身を熟知していることになります。依存関係としてふさわしくないですよね。
上記は半ば無理やり実装した場合ですので、普通ではないコードです。
では普通にしたらどうなるかというと、タイトルの通り、普通は「気づけない」のです。
Executorフレームワークを使って解決する
今回は Callable / Future というものを使うと綺麗に解決できます。
Callable/Futureとは
今までタスク定義はRunnableとして定義してきました。ここでは戻り値を返すことのできるCallableを使います。
使い方はRunnableと変わりません。Callableインターフェースをimplements宣言して、callメソッドを実装します。
public class MyTask implements Callable<Integer> {
private int value;
public MyTask(int value) {
this.value = value;
}
@Override
public Integer call() throws Exception {
System.out.println("[start] value:" + value + " thread-id:" + Thread.currentThread().getId());
// 何か複雑な処理
System.out.println("[ end ] value:" + value + " thread-id:" + Thread.currentThread().getId());
return value + 10;
}
}
callメソッドのシグネチャを見てください。戻り値をもち、throws宣言があります。つまり値を返したり例外を投げたりする事が可能なのです。
FutureはCallableの戻り値を扱うものです。
ExecutorService#getを使って、Callableの戻り値をFutureとして取得する事ができます。
public class App {
public static void main(String[] args) throws InterruptedException {
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<Integer> result = executor.submit(new MyTask(1));
try {
System.out.println(result.get());
}catch(Exception e){
e.printStackTrace();
}
}
}
結果を取得する
それでは、結果を取得してみましょう。
コードを上記の通りとすると、タスクの戻り値はvalue+10です。
実行結果はその通りになります。
// コンソールの出力:
// [start] value:1 thread-id:20
// [ end ] value:1 thread-id:20
// 11
スレッド間の共有変数などといった「変なこと」を一切せずに値を取得する事ができました。
例外を受け取る
続いて、例外を投げるようにしてみましょう。
mainメソッドは上記のまま、タスクだけ例外を投げるように変えてみます。
public class MyTask implements Callable<Integer> {
@Override
public Integer call() throws Exception {
System.out.println("[start] value:" + value + " thread-id:" + Thread.currentThread().getId());
// 何か複雑な処理
process();
System.out.println("[ end ] value:" + value + " thread-id:" + Thread.currentThread().getId());
return value + 10;
}
private void process() {
if (value == 1) {
throw new RuntimeException("なんらかのエラーA");
} else if (value <= 3) {
throw new RuntimeException("なんらかのエラーB");
} else {
throw new RuntimeException("なんらかのエラーC");
}
}
}
// コンソールの出力:
// java.util.concurrent.ExecutionException: java.lang.RuntimeException: なんらかのエラーA
// at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
// at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191)
// at thread.App.main(App.java:14)
// Caused by: java.lang.RuntimeException: なんらかのエラーA
// at thread.thread.MyTask.process(MyTask.java:23)
// at thread.thread.MyTask.call(MyTask.java:16)
// at thread.thread.MyTask.call(MyTask.java:5)
// at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
// at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
// at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
// at java.base/java.lang.Thread.run(Thread.java:1583)
タスク側はいつも通り例外を投げたらいいだけ、メイン側もいつも通りcatchしたらいいだけで、「普通」の書き方ができます。
自力でやろうとするとどうしても複雑になってしまう処理を、こうも綺麗に隠蔽できるなんて、素晴らしいと思いませんか。
おわりに
初学の時はFutureという概念を全く理解できなかったのですが、今となってはなぜあんなに意味不明だと恐れていたのかわかりません。
理解できない、怖い、と思っている時こそ、このようなまとめ記事を書くのに最適なタイミングだと思います。
一度理解できてしまうと、もう理解できなかった時の疑問を思い出せないんですよね。
本題とずれてしまいましたが、Callable, Future怖くないよ、という話でした