scala.concurrent.Await周りのコードを読んだのでメモ。
使用例
Await.result(Future(something), 3.seconds)
説明
Await.resultの定義を見ると、
object Await {
...
def result[T](awaitable: Awaitable[T], atMost: Duration): T =
blocking(awaitable.result(atMost)(AwaitPermission))
Awaitableを受け取っている。当然FutureはAwaitableを継承していることになる。
trait Future[+T] extends Awaitable[T] {
...
resultメソッドを見てみる。FutureがAwaitableを継承しているが、実装はDefaultPromiseにある。
class DefaultPromise {
...
def result(atMost: Duration)(implicit permit: CanAwait): T =
ready(atMost).value.get.get
readyを呼んでいる↓
class DefaultPromise {
...
def ready(atMost: Duration)(implicit permit: CanAwait): this.type =
if (tryAwait(atMost)) this
else throw new TimeoutException("Futures timed out after [" + atMost + "]")
tryAwaitを呼んでいる↓
class DefaultPromise {
...
protected final def tryAwait(atMost: Duration): Boolean = if (!isCompleted) {
...
atMost match {
...
case f: FiniteDuration =>
if (f > Duration.Zero) {
val l = new CompletionLatch[T]()
onComplete(l)(InternalCallbackExecutor)
l.tryAcquireSharedNanos(1, f.toNanos)
}
}
isCompleted
} else true // Already completed
tryAcquireSharedNanosが、指定時間ブロッキングする。
もし実行中の処理が終わって、onCompleteのCompletionLatchが実行されるとtryAcquireSharedNanosがreleaseされて、isCompletedが返る。(完了してるのでtrue)
もし実行中の処理が終わる前に、tryAcquireSharedNanosのブロッキングが終わってしまったら、isCompletedが返り(完了してないのでfalse)、throw TimeoutExceptionされる。
後は、value(完了してるので値がある)を返すだけ。
ちなみにCompletionLatchは、AbstractQueuedSynchronizerを使っている。
private final class CompletionLatch[T] extends AbstractQueuedSynchronizer with (Try[T] => Unit) {
override protected def tryAcquireShared(ignored: Int): Int = if (getState != 0) 1 else -1
override protected def tryReleaseShared(ignore: Int): Boolean = {
setState(1)
true
}
override def apply(ignored: Try[T]): Unit = releaseShared(1)
}
結構カオスなコードだと思うけど、自分で書けと言われてもきれいに書ける自信ない。やはりscalazのFutureは読み見やすいのでいいなぁと思った。
おまけ
こっちにも書いてますが、スレッドが全部使われているとブロッキングするので予期せずタイムアウトしてしまうので注意。
// コア数は4つなので、デフォルトで作成可能なスレッドは4つ。
scala> Runtime.getRuntime.availableProcessors
res154: Int = 4
// インポート
scala> import scala.concurrent._, duration._, ExecutionContext.Implicits.global
import scala.concurrent._
import duration._
import ExecutionContext.Implicits.global
// 5秒待つ関数定義
scala> def sleep = { Thread.sleep(5000); println("done") }
sleep: Unit
// Future4つででスレッド4つスリープしてる状態で、Awaitすると実行可能なスレッドがないのでタスクが待ちになる。
// しかしAwait自体のカウントダウンは進むので、何もしてないFutureでもタイムアウトになる。
scala> Future(sleep); Future(sleep); Future(sleep); Future(sleep); Await.result(Future(1), 1.seconds)
java.util.concurrent.TimeoutException: Futures timed out after [1 second]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:116)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:116)
... 33 elided
scala> done
done
done
done
// 3つだけなら、実行可能なスレッドが1つ開いてるのでうまく動く。
scala> Future(sleep); Future(sleep); Future(sleep); Await.result(Future(1), 1.seconds)
res156: Int = 1
scala> done
done
done
// 追記:blockingを使うと、いけますが上限のスレッド数を超えて動くので注意。
scala> def sleep = blocking { Thread.sleep(5000); println("done") }
sleep: Unit
scala> Future(sleep); Future(sleep); Future(sleep); Future(sleep); Await.result(Future(1), 1.seconds)
res0: Int = 1
scala> done
done
done
done
ブロッキングに気づかないで、あきらかに軽い処理なのにタイムアウトする...みたいなことが起きる。起きた。
ふむふむふむ