概要
Future を使うと非同期処理が容易に実装できて便利ですよね。
でも、内部実装が良く分からないと、どのように使えば効果的なのか分かりません。それに、あまりにも簡単に非同期処理を実装できるから、何だかちょっと怖いです。何か変な副作用はないのか?とか思ってしまいます。
なので、ソースコードを追ってみることにしました。
scala のバージョンは 2.11 です。
Future の使用例
val f = Future {
// 時間がかかる処理
}
f onComplete {
case Failure(e) => println( "fail" )
case Success(r) => println( "success" )
}
Future 処理の実行
Future{...} のブロック内のコードはどのようにして実行されるのでしょうか。
java でスレッドを生成する際にお馴染みの Runnable.run で実行されます。
Future{...} は下記の apply メソッドの呼び出しになるので、{...} は PromiseCompletingRunnable のコンストラクタの引数 body として渡され、run メソッド内で呼び出されています。
private[concurrent] object Future {
class PromiseCompletingRunnable[T](body: => T) extends Runnable {
val promise = new Promise.DefaultPromise[T]()
override def run() = {
promise complete {
try Success(body) catch { case NonFatal(e) => Failure(e) }
}
}
}
def apply[T](body: =>T)(implicit executor: ExecutionContext): scala.concurrent.Future[T] = {
val runnable = new PromiseCompletingRunnable(body)
executor.prepare.execute(runnable)
runnable.promise.future
}
}
PromiseCompletingRunnable の run メソッドの実行タイミングですが、上記の apply メソッド内の以下の処理で、scala の管理しているスレッドプールのキューに runnable が追加されます。そのため、Future{...} を呼び出したスレッドとは別のスレッドで実行されます。
executor.prepare.execute(runnable)
では run メソッドの中身を見ていきましょう。
run メソッド内で Future{...} のブロックの実行、および、戻り値の格納(Future のインスタンス変数への代入)が行われているはずです。
override def run() = {
promise complete {
try Success(body) catch { case NonFatal(e) => Failure(e) }
}
}
ブロックの戻り値が promise complete に渡されていますね。
complete メソッドを追っていくと、以下のメソッドにたどり着きます。
def tryComplete(value: Try[T]): Boolean = {
val resolved = resolveTry(value)
tryCompleteAndGetListeners(resolved) match {
case null => false
case rs if rs.isEmpty => true
case rs => rs.foreach(r => r.executeWithValue(resolved)); true
}
}
引数の value はブロックの戻り値です。resolveTry は value をそのまま返すだけなので、tryCompleteAndGetListeners メソッドを追います。
@tailrec
private def tryCompleteAndGetListeners(v: Try[T]): List[CallbackRunnable[T]] = {
getState match {
case raw: List[_] =>
val cur = raw.asInstanceOf[List[CallbackRunnable[T]]]
if (updateState(cur, v)) cur else tryCompleteAndGetListeners(v)
case _: DefaultPromise[_] =>
compressedRoot().tryCompleteAndGetListeners(v)
case _ => null
}
}
getState, updateState だの、List[CallbackRunnable[T]] だのが出てきましたが、焦る必要はありません。
CallbackRunnable は、Future の結果を受け取る処理、例えば最初の使用例で挙げた f.onComplete{...} のブロックを実行する Runnable です。
getState, updateState メソッドですが、Promise は java.util.concurrent.atomic.AtomicReference(value インスタンス変数を atomic に更新できるクラス)を拡張しています。
@Deprecated // Since 2.11.8. Extend java.util.concurrent.atomic.AtomicReference instead.
abstract class AbstractPromise extends AtomicReference<Object> {
protected final boolean updateState(Object oldState, Object newState) { return compareAndSet(oldState, newState); }
protected final Object getState() { return get(); }
}
value は List[CallbackRunnable[T]](Future 処理の実行結果を使用する処理のリスト)または Future{...} ブロックの実行結果のようです。※value の型は Object なので型が異なっても問題ないです。
value の初期値として DefaultPromise のコンストラクタ内で Nil が設定されます。
というわけで、CallbackRunnable が List に追加されているかは分かりません(追加タイミングは f.onComplete 呼び出し時など)が、以下の case 文を通過して、最終的には実行結果が value に設定されます。
case raw: List[_] =>
val cur = raw.asInstanceOf[List[CallbackRunnable[T]]]
if (updateState(cur, v)) cur else tryCompleteAndGetListeners(v)
戻り値は CallbackRunnable の List になります。戻り値を処理するコードは、上で見た通り以下のようになっています。Listが空の場合は処理を終え、空でない場合は CallbackRunnable.executeWithValue が呼び出されます。
tryCompleteAndGetListeners(resolved) match {
case null => false
case rs if rs.isEmpty => true
case rs => rs.foreach(r => r.executeWithValue(resolved)); true
}
以上で、Future ブロックが実行されて、戻り値が Promise のインスタンス変数に格納され、CallbackRunnable に渡されるまでの処理を一通り追うことができました。
Future ブロックの戻り値の取得(非同期で処理する場合)
さて、Future の結果を使用する側はどうなっているでしょうか。
f.onComplete{...} の処理を追います。
def onComplete[U](func: Try[T] => U)(implicit executor: ExecutionContext): Unit = {
val preparedEC = executor.prepare()
val runnable = new CallbackRunnable[T](preparedEC, func)
dispatchOrAddCallback(runnable)
}
引数の func は f.onComplete{...} に渡しているブロックです。CallbackRunnable のコードは以下のようになっているので、executeWithValue メソッドが呼ばれるとキューに登録され、最終的に onComplete(=上の func 引数)メソッドが呼ばれることが分かります。
private class CallbackRunnable[T](val executor: ExecutionContext, val onComplete: Try[T] => Any) extends Runnable with OnCompleteRunnable {
// must be filled in before running it
var value: Try[T] = null
override def run() = {
require(value ne null) // must set value to non-null before running!
try onComplete(value) catch { case NonFatal(e) => executor reportFailure e }
}
def executeWithValue(v: Try[T]): Unit = {
require(value eq null) // can't complete it twice
value = v
// Note that we cannot prepare the ExecutionContext at this point, since we might
// already be running on a different thread!
try executor.execute(this) catch { case NonFatal(t) => executor reportFailure t }
}
}
dispatchOrAddCallback(runnable) の処理を追います。
@tailrec
private def dispatchOrAddCallback(runnable: CallbackRunnable[T]): Unit = {
getState match {
case r: Try[_] => runnable.executeWithValue(r.asInstanceOf[Try[T]])
case _: DefaultPromise[_] => compressedRoot().dispatchOrAddCallback(runnable)
case listeners: List[_] => if (updateState(listeners, runnable :: listeners)) () else dispatchOrAddCallback(runnable)
}
}
Future ブロックの実行が完了している場合は、先ほど見たように、getState の返り値は Future ブロックの実行結果になるので、即座にrunnable.executeWithValue メソッドが呼ばれます。
Future ブロックの実行が未完了の場合は、getState は CallbackRunnable の List を返すので、runnable :: listeners で自分自身が List に追加されます。Future ブロックの実行処理で見たように、Future ブロックの実行が完了したタイミングで List に入っている CallbackRunnable の executeWithValue メソッドが呼ばれます。
以上で、Future ブロックの実行結果を非同期で受け取る処理については分かりました。
Future ブロックの戻り値の取得(同期する場合)
同期する場合はどうでしょうか。
val r = Await.result (f, Duration.Inf)
Await.result メソッドは以下のようになっています。
@throws(classOf[Exception])
def result[T](awaitable: Awaitable[T], atMost: Duration): T =
blocking(awaitable.result(atMost)(AwaitPermission))
awaitable と言っても実体は DefaultPromise のインスタンスですので、DefaultPromise の result メソッドが呼び出されます。
@throws(classOf[Exception])
def result(atMost: Duration)(implicit permit: CanAwait): T =
ready(atMost).value.get.get // ready throws TimeoutException if timeout so value.get is safe here
ready メソッドの戻り値は this.type = DefaultPromise(つまり this)ですので、ready(atMost).value.get.get は Future ブロックの実行結果になります。
ready メソッドからは以下の tryAwait メソッドが呼ばれています。
protected final def tryAwait(atMost: Duration): Boolean = if (!isCompleted) {
import Duration.Undefined
import scala.concurrent.Future.InternalCallbackExecutor
atMost match {
case e if e eq Undefined => throw new IllegalArgumentException("cannot wait for Undefined period")
case Duration.Inf =>
val l = new CompletionLatch[T]()
onComplete(l)(InternalCallbackExecutor)
l.acquireSharedInterruptibly(1)
case Duration.MinusInf => // Drop out
case f: FiniteDuration =>
if (f > Duration.Zero) {
val l = new CompletionLatch[T]()
onComplete(l)(InternalCallbackExecutor)
l.tryAcquireSharedNanos(1, f.toNanos)
}
}
isCompleted
} else true // Already completed
Future ブロックの実行が完了している場合は isCompleted が true になるので、即座に返ります。
実行が未完了の場合は処理の完了を待ちます。
CompletionLatch クラスは AbstractQueuedSynchronizer クラス(java で用意されている同期処理を実装するための仕組み)を拡張しています。acquireSharedInterruptibly メソッドが呼び出されると、 state != 0 になるまで待ちます。state は releaseShared メソッドで更新されます。
また、CompletionLatch クラスは Function trait を拡張しているので、関数として扱うこともできます。
private final class CompletionLatch[T] extends AbstractQueuedSynchronizer with (Try[T] => Unit) {
override protected def tryAcquireShared(ignored: Int): Int = if (getState != 0) 1 else -1
override protected def tryReleaseShared(ignore: Int): Boolean = {
setState(1)
true
}
override def apply(ignored: Try[T]): Unit = releaseShared(1)
}
onComplete メソッド内では、Future が完了していれば CompletionLatch(の apply メソッドとして定義されている releaseShared)が実行され(実行する Runnable をキューに入れられ)、未完了であれば List[CallbackRunnable] に追加され、Future の完了時に releaseShared メソッドが呼ばれます。上で述べたように、releaseShared が実行されると state != 0 になるので tryAwait メソッドが返ります。
def onComplete[U](func: Try[T] => U)(implicit executor: ExecutionContext): Unit = {
val preparedEC = executor.prepare()
val runnable = new CallbackRunnable[T](preparedEC, func)
dispatchOrAddCallback(runnable)
}
このようにして同期処理が実装されています。
まとめ
scala の内部コードを始めて読みました。こうやってコードを読んでみると多少は理解が進みますね。何となく怖かった Future も、これからは心配せずに使えそうです。
時間があれば ExecutionContext の実装を掘り下げてみたいです。