Help us understand the problem. What is going on with this article?

言語によってちょっと違うFuture/Promiseをまとめてみた(1)

More than 5 years have passed since last update.

はじめに

最近非同期プログラミングに触れる機会が増えてきていますが、言語ごとに使い方が少しずつ異なるので、よく使うJava、JavaScript(ECMAScript6)、Scala についてまとめてみました。

まとめる前の時点ではこんな程度の理解でした。

  • Future は get するとブロックするやつだよね!
  • Promise は コールバック登録するやつで Future とセットで使うんだよね!
  • スレッドに責任持っているクラスとの連携が必要なはずだけどググらないとわからないや……
  • エラー処理とかどうするんだろう?

Java 1.5 以降の場合

1.5 以降、java.util.concurrent 配下に、非同期実行に関連するインターフェースが定義されています。

Future<V>

  • get で結果Vが返ってくるまで待つ
  • cancel で キャンセル。isCanceledisDone で状態を調べられる
  • 生成は Future<T> ExecutorService.submit(Callable<T> myfunc) に依頼
    • Callable<V>V call() という重い処理を入れるためのコールバックを持つ
    • 処理結果がいらないなら Future<?> ExecutorService.submit(Runnable myfunc) もある

FutureTask<V>

  • FutureRunnable を継承しており、完了時のコールバック done を持っている
  • 生成するときに Callable を引数にとる
  • Runnable だから ExecuterService に渡すときは void execute(Runnable job) メソッド。submit だと戻り値で別のFutureが返ってきてよくわからないことに

Futureが完了したら何かをする」というコールバックまわりをどう書くべきか悩ましいままコード例を書いてみましたが、タイムアウトをいれようとすると Future.get(long, TimeUnit) を使うしかなく、そうすると done は何をしたらよいの?ということになってしまいました。やりたいことと FutureTask が合わない感じです。

ググってみてもコールバックを登録する例がでてこないし、現実的にタイムアウトは無視できない以上、FutureTask は処理終了のコールバックを呼び出す十分な機能を持ってないように思います。

FutureSample.java
        ExecutorService executor = Executors.newSingleThreadExecutor();

        // 重い処理を登録
        Callable<String> heavy = new Callable<String>() {
            @Override public String call() throws InterruptedException {
                Thread.sleep(2000);
                return "Je suis fatigue!";
            }
        };

        // コールバックを登録
        FutureTask<String> myTask = new FutureTask<String>(heavy) {
            @Override protected void done() {
                try {
                    puts("result: " + get()); // ここではブロックしない
                } catch {} // 記述省略
            }
        };

        // 実行!!
        executor.execute(myTask);

        // タイムアウトを考慮しようとすると get の他の方法が見つからず、getしてしまうと done を定義した意味がなくなる
        String result = myTask.get(1000, TimeUnit.MILLISECONDS)

        executor.shutdown();

Java8の場合

CompletableFuture と CompletionStage

と思って調べていたら、Java8 に CompletableFuture が導入されていました。

  • complete で外部からの完了通知を受けます
  • CompletableFuture.supplyAsync(Supplier myfunc)CompletableFuture を生成する。executorを引数にとることもできる

  • 結果のハンドリングは、親クラスの CompletionStage のメソッド群で行う

    • stage.thenApply(x -> x * 2).thenApply(x -> newStage(x)).thenRun(x -> System.out.println()) のように書けるようだ
  • ただ、利用方法の組み合わせ毎に膨大なメソッドがある。以下のように把握できそう……と思ったけどちょっと覚えられる気がしない

CompletionStageメソッド探検

先頭の動詞部分

  • runAfterXX : 正常終了時に Runnable に渡す。(結果はそこで途切れる)
  • acceptXX : 正常終了時に Consumer に渡す。(結果はそこで途切れる)
  • applyXX : 正常終了時に Function に渡す。Either の前の時は To が付いて applyToEitherXX になる
  • handleXX : 正常終了と異常終了を両方扱える BiConsumer に渡す。
  • then... : 自分自身の終了を待つときの枕詞。Either の時は then はつかない。ただしBoth の場合でも thenRunBoth ではなくて runAfterBoth だったりして少々不規則変化している
  • whenComplete : BiConsumer をとり、終了時に何らかの処理を行うが元の結果を引き継ぐ(tap的なもの)

同期または非同期

  • XXAsync : 引数の最後に Executor をとれる。省略するとデフォルトのExecutorになる

他の ComplationStage との待ち合わせや合成

  • Either : 引数に 他の CompletionStage をとり、そちらの終了結果を待ち合わせる。
  • Both : 引数に他の CompletionStage をとり、これと自分自身の両方の終了を待ち BiFunctionBiConsumer に渡す
  • compose : applyだけど 関数の戻り値が ComplationStage の場合
  • combine : applyBoth は存在せず、それに相当するものを combine と呼んでいるようだ

こちらの記事 も参考になりました

Play! Framework 1 for Java の場合

Play1 には F.Promise が定義されています。

  • Future<V>F.Action<V> を継承しています
    • Fufure<V>java.util.concurrent のものなので get できる
    • Action<V>void invoke(V result) のみ定義されたインターフェース
  • V get() で結果が返るまでブロック
  • onRedeem(Action<Promise<V>> callback) で完了時のコールバックを登録。いくつでも登録できる。

誰がどうやって完了を通知するのか?

  • invoke(V result) で結果を登録
  • この Promise を引数にとる Callable を作り、call() の中の重い処理の最後に invoke をたたく作りにすることが多いようだ
  • そうやって作った Callable をスレッド作る人に渡す。

play.jobs.Job から Promise 生成部分の例を抜粋

Job.java
    public Promise<V> in(int seconds) {
        final Promise<V> smartFuture = new Promise<V>();
        JobsPlugin.executor.schedule(getJobCallingCallable(smartFuture), seconds, TimeUnit.SECONDS);
        return smartFuture;
    }

    private Callable<V> getJobCallingCallable(final Promise<V> smartFuture) {
      return new Callable<V>() {
        public V call() throws Exception {
          try {
            V result = Job.this.call();
            if (smartFuture != null) {
              smartFuture.invoke(result);
            }
            return result;
          }
          catch (Exception e) {
            if (smartFuture != null) {
              smartFuture.invokeWithException(e);
              }
            return null;
          }
        }
      };
    }

Promise は合成ができます

  • Promise<List<T>> Promise.waitAll(Promise<V> promises...)Promise を合成できる。
  • Promise<F.Tuple<A, B>> wait2(Promise<A> tA, Promise<B> tB) など型が違う Promise を合成することもできる。
  • これらはすべての Promise の終了を待つ
  • 一つでも先に終わればよい場合は Promise<T> waitAny(Promise<T>... futures)Promise<T> waitEither(Promise<A> tA, Promise<B> tB)) がある
  • タイムアウトしたい場合は Timeout という Promise を継承したタイムアウト用クラスを作り、waitAny で合成するようだ
Promise task = F.Promise.waitAny(F.Timeout(100), myPromise);

F.EventStream

こちらも面白そうですが、大変そうなので今回はパスすることにします。

参考: Play1 の Promise のソース

Play2Java、JavaScript、Scala

やはりというかなんというか、思ったより大変なので Play2Java、JavaScript、 Scala は次回に続くことにします。

reki2000
3Dエンジンの中身とかCPUを作るとかコンパイラを作るなど低レイヤ好きです。8bit時代のネタにはよく釣られます。昔から C、SQL、Java、Scala をよく使い、最近は Python や Go を使うことが多いですが、Julia や Rust がやりたいことに向いている気がしてきており勉強中です。
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away