はじめに
最近非同期プログラミングに触れる機会が増えてきていますが、言語ごとに使い方が少しずつ異なるので、よく使うJava、JavaScript(ECMAScript6)、Scala についてまとめてみました。
まとめる前の時点ではこんな程度の理解でした。
- Future は get するとブロックするやつだよね!
- Promise は コールバック登録するやつで Future とセットで使うんだよね!
- スレッドに責任持っているクラスとの連携が必要なはずだけどググらないとわからないや……
- エラー処理とかどうするんだろう?
Java 1.5 以降の場合
1.5 以降、java.util.concurrent
配下に、非同期実行に関連するインターフェースが定義されています。
Future<V>
- get で結果Vが返ってくるまで待つ
-
cancel
で キャンセル。isCanceled
、isDone
で状態を調べられる - 生成は
Future<T> ExecutorService.submit(Callable<T> myfunc)
に依頼 -
Callable<V>
はV call()
という重い処理を入れるためのコールバックを持つ - 処理結果がいらないなら
Future<?> ExecutorService.submit(Runnable myfunc)
もある
FutureTask<V>
-
Future
とRunnable
を継承しており、完了時のコールバック done を持っている - 生成するときに
Callable
を引数にとる -
Runnable
だからExecuterService
に渡すときはvoid execute(Runnable job)
メソッド。submit
だと戻り値で別のFutureが返ってきてよくわからないことに
「Future
が完了したら何かをする」というコールバックまわりをどう書くべきか悩ましいままコード例を書いてみましたが、タイムアウトをいれようとすると Future.get(long, TimeUnit)
を使うしかなく、そうすると done は何をしたらよいの?ということになってしまいました。やりたいことと FutureTask
が合わない感じです。
ググってみてもコールバックを登録する例がでてこないし、現実的にタイムアウトは無視できない以上、FutureTask
は処理終了のコールバックを呼び出す十分な機能を持ってないように思います。
ExecutorService executor = Executors.newSingleThreadExecutor();
// 重い処理を登録
Callable<String> heavy = new Callable<String>() {
@Override public String call() throws InterruptedException {
Thread.sleep(2000);
return "Je suis fatigue!";
}
};
// コールバックを登録
FutureTask<String> myTask = new FutureTask<String>(heavy) {
@Override protected void done() {
try {
puts("result: " + get()); // ここではブロックしない
} catch {} // 記述省略
}
};
// 実行!!
executor.execute(myTask);
// タイムアウトを考慮しようとすると get の他の方法が見つからず、getしてしまうと done を定義した意味がなくなる
String result = myTask.get(1000, TimeUnit.MILLISECONDS)
executor.shutdown();
Java8の場合
CompletableFuture と CompletionStage
と思って調べていたら、Java8 に CompletableFuture が導入されていました。
-
complete
で外部からの完了通知を受けます -
CompletableFuture.supplyAsync(Supplier myfunc)
でCompletableFuture
を生成する。executorを引数にとることもできる -
結果のハンドリングは、親クラスの CompletionStage のメソッド群で行う
-
stage.thenApply(x -> x * 2).thenApply(x -> newStage(x)).thenRun(x -> System.out.println())
のように書けるようだ
-
-
ただ、利用方法の組み合わせ毎に膨大なメソッドがある。以下のように把握できそう……と思ったけどちょっと覚えられる気がしない
CompletionStageメソッド探検
先頭の動詞部分
-
runAfterXX
: 正常終了時にRunnable
に渡す。(結果はそこで途切れる) -
acceptXX
: 正常終了時にConsumer
に渡す。(結果はそこで途切れる) -
applyXX
: 正常終了時にFunction
に渡す。Either の前の時は To が付いてapplyToEitherXX
になる -
handleXX
: 正常終了と異常終了を両方扱えるBiConsumer
に渡す。 -
then...
: 自分自身の終了を待つときの枕詞。Either の時は then はつかない。ただしBoth の場合でも thenRunBoth ではなくて runAfterBoth だったりして少々不規則変化している -
whenComplete
:BiConsumer
をとり、終了時に何らかの処理を行うが元の結果を引き継ぐ(tap的なもの)
同期または非同期
-
XXAsync
: 引数の最後にExecutor
をとれる。省略するとデフォルトのExecutorになる
他の ComplationStage
との待ち合わせや合成
-
Either
: 引数に 他のCompletionStage
をとり、そちらの終了結果を待ち合わせる。 -
Both
: 引数に他のCompletionStage
をとり、これと自分自身の両方の終了を待ちBiFunction
やBiConsumer
に渡す -
compose
: applyだけど 関数の戻り値がComplationStage
の場合 -
combine
: applyBoth は存在せず、それに相当するものを combine と呼んでいるようだ
[こちらの記事] (http://koiroha.blogspot.jp/2014/04/java8-completablefuture-usefull-asa-scala-future.html) も参考になりました
Play! Framework 1 for Java の場合
Play1 には F.Promise
が定義されています。
-
Future<V>
とF.Action<V>
を継承しています-
Fufure<V>
はjava.util.concurrent
のものなのでget
できる -
Action<V>
はvoid invoke(V result)
のみ定義されたインターフェース
-
-
V get()
で結果が返るまでブロック -
onRedeem(Action<Promise<V>> callback)
で完了時のコールバックを登録。いくつでも登録できる。
誰がどうやって完了を通知するのか?
-
invoke(V result)
で結果を登録 - この Promise を引数にとる Callable を作り、call() の中の重い処理の最後に invoke をたたく作りにすることが多いようだ
- そうやって作った Callable をスレッド作る人に渡す。
play.jobs.Job から Promise 生成部分の例を抜粋
public Promise<V> in(int seconds) {
final Promise<V> smartFuture = new Promise<V>();
JobsPlugin.executor.schedule(getJobCallingCallable(smartFuture), seconds, TimeUnit.SECONDS);
return smartFuture;
}
private Callable<V> getJobCallingCallable(final Promise<V> smartFuture) {
return new Callable<V>() {
public V call() throws Exception {
try {
V result = Job.this.call();
if (smartFuture != null) {
smartFuture.invoke(result);
}
return result;
}
catch (Exception e) {
if (smartFuture != null) {
smartFuture.invokeWithException(e);
}
return null;
}
}
};
}
Promise は合成ができます
-
Promise<List<T>> Promise.waitAll(Promise<V> promises...)
でPromise
を合成できる。 -
Promise<F.Tuple<A, B>> wait2(Promise<A> tA, Promise<B> tB)
など型が違う Promise を合成することもできる。 - これらはすべての
Promise
の終了を待つ - 一つでも先に終わればよい場合は
Promise<T> waitAny(Promise<T>... futures)
、Promise<T> waitEither(Promise<A> tA, Promise<B> tB))
がある - タイムアウトしたい場合は
Timeout
というPromise
を継承したタイムアウト用クラスを作り、waitAny
で合成するようだ
Promise task = F.Promise.waitAny(F.Timeout(100), myPromise);
F.EventStream
こちらも面白そうですが、大変そうなので今回はパスすることにします。
Play2Java、JavaScript、Scala
やはりというかなんというか、思ったより大変なので Play2Java、JavaScript、 Scala は次回に続くことにします。