Finagleを使っていると、twitter.util.Futureと付き合うことになります。その挙動について記しておきます。
Future.value, Future.applyの挙動がscala.concurrentと違う
TL;DR.
-
com.twitter.util.Future
はそれだけではasyncで実行しない。FuturePool.unboundedPool
等を使う必要がある。 -
scala.concurrent.Future
はapplyすると即ExecutorServiceで実行される -
Future.value
は単にConstFutureのインスタンスを作って評価済みの値をFutureに包むだけ
Pop quiz
Pop quizに似た感じで、以下の状況を考えてみる
import com.twitter.util.{Future => TwFuture, Await => TwAwait}
val twitterF1 = TwFuture {
println("twitter f1 start")
Thread.sleep(2000)
}
twitterF1 onSuccess (_ => println("twitter f1 end"))
val twitterF2 = TwFuture {
println("twitter f2 start")
Thread.sleep(1000)
}
twitterF2 onSuccess (_ => println("twitter f2 end"))
TwAwait.all(twitterF1, twitterF2)
出力は以下。
f1 endがf2 startよりも先。
twitter f1 start
twitter f1 end
twitter f2 start
twitter f2 end
ちなみに、scala Futureだと以下でasync実行されます。
import scala.concurrent.duration.Duration
import scala.concurrent.{Future => SFuture, Await => SAwait}
import scala.concurrent.ExecutionContext.Implicits.global
val scalaF1 = SFuture {
println("scala f1 start")
Thread.sleep(2000)
}
scalaF1 onComplete(x => println("scala f1 onComplete"))
val scalaF2 = SFuture {
println("scala f2 start")
Thread.sleep(1000)
}
scalaF2 onComplete(x => println("scala f2 onComplete"))
SAwait.result(scalaF1, Duration(10000, "millis"))
SAwait.result(scalaF2, Duration(10000, "millis"))
scala f1 start
scala f2 start
scala f2 onComplete
scala f1 onComplete
twitter.util.Futureでasync実行させるには、以下のように明示的にFuturePoolを使う必要があります。
import com.twitter.util.{Future => TwFuture, Await => TwAwait, FuturePool}
val twitterF1 = FuturePool.unboundedPool {
println("twitter f1 async start")
Thread.sleep(2000)
}
twitterF1 onSuccess (_ => println("twitter f1 async end"))
val twitterF2 = FuturePool.unboundedPool {
println("twitter f2 async start")
Thread.sleep(1000)
}
twitterF2 onSuccess (_ => println("twitter f2 async end"))
TwAwait.all(twitterF1, twitterF2)
twitter f1 async start
twitter f2 async start
twitter f2 async end
twitter f1 async end
実装の確認
Future.applyの実装を見てみます。
def apply[A](a: => A): Future[A] = try {
const(Try(a))
} catch {
case nlrc: NonLocalReturnControl[_] => Future.exception(new FutureNonLocalReturnControl(nlrc))
}
Tryのコンストラクタは以下
def apply[R](r: => R): Try[R] = {
try { Return(r) } catch {
case NonFatal(e) => Throw(e)
}
}
正常終了した値であることを示すReturnに値を渡しています。
この段階でFuture.applyに渡された処理(今回はThread.sleepとか)が評価され、Returnに格納されます。
その後、Futureのインスタンスに対してonSuccessやflatMapが呼ばれるタイミングでConstFutureのrespondが呼ばれます。
この中でScheduler(デフォルトはLocalScheduler)にRunnableがsubmitされます。kはflatmapで渡されたコールバック。
def respond(k: Try[A] => Unit): Future[A] = {
val saved = Local.save()
Scheduler.submit(new Runnable {
def run() {
val current = Local.save()
Local.restore(saved)
try Monitor { k(result) } finally Local.restore(current)
}
})
this
}
Schedulerへのsubmitではjava.util.ArrayDeque
にたいしてRunnableが登録されていきます。Schedulerが持っているActivationがrunnningでなければ、タスクはすぐに実行されます。
ちなみに、Futureインスタンスに対してflatMapが呼ばれた際はtransformが実行されます。これは結果をFuture[B]の形で返すため、Promiseを作成します。
def transform[B](f: Try[A] => Future[B]): Future[B] = {
val p = new Promise[B]
respond({ r =>
val result = try f(r) catch {
case e: NonLocalReturnControl[_] => Future.exception(new FutureNonLocalReturnControl(e))
case NonFatal(e) => Future.exception(e)
}
p.become(result)
})
p
}
このように、ExecutorServiceは出てきません。
まとめ
com.twitter.util.Future
は非同期処理が起こる可能性を示していますが、Future.apply
等では非同期実行されません。
戻り値としてのFutureを扱う側としては、非同期実行されているかどうかはあずかり知らぬ話なので気にする必要はないですが、実装時にはscala.concurrentとの違いがあることを頭に留めておいたほうが良さそう。