LoginSignup
11
8

More than 5 years have passed since last update.

Awaitの仕組みを説明してみる

Last updated at Posted at 2015-06-25

scala.concurrent.Await周りのコードを読んだのでメモ。

使用例

Await.result(Future(something), 3.seconds)

説明

Await.resultの定義を見ると、

concurrent/package.scala
object Await {
  ...
  def result[T](awaitable: Awaitable[T], atMost: Duration): T =
      blocking(awaitable.result(atMost)(AwaitPermission))

Awaitableを受け取っている。当然FutureはAwaitableを継承していることになる。

concurrent/Future.scala
trait Future[+T] extends Awaitable[T] {
  ...

resultメソッドを見てみる。FutureがAwaitableを継承しているが、実装はDefaultPromiseにある。

concurrent/impl/Promise.scala
class DefaultPromise {
  ...
  def result(atMost: Duration)(implicit permit: CanAwait): T =
    ready(atMost).value.get.get

readyを呼んでいる↓

concurrent/impl/Promise.scala
class DefaultPromise {
  ...

  def ready(atMost: Duration)(implicit permit: CanAwait): this.type =
    if (tryAwait(atMost)) this
    else throw new TimeoutException("Futures timed out after [" + atMost + "]")

tryAwaitを呼んでいる↓

concurrent/impl/Promise.scala
class DefaultPromise {
  ...

  protected final def tryAwait(atMost: Duration): Boolean = if (!isCompleted) {
      ...
      atMost match {
        ...
        case f: FiniteDuration   =>
          if (f > Duration.Zero) {
            val l = new CompletionLatch[T]()
            onComplete(l)(InternalCallbackExecutor)
            l.tryAcquireSharedNanos(1, f.toNanos)
          }
      }

      isCompleted
    } else true // Already completed

tryAcquireSharedNanosが、指定時間ブロッキングする。

もし実行中の処理が終わって、onCompleteのCompletionLatchが実行されるとtryAcquireSharedNanosがreleaseされて、isCompletedが返る。(完了してるのでtrue)

もし実行中の処理が終わる前に、tryAcquireSharedNanosのブロッキングが終わってしまったら、isCompletedが返り(完了してないのでfalse)、throw TimeoutExceptionされる。

後は、value(完了してるので値がある)を返すだけ。

ちなみにCompletionLatchは、AbstractQueuedSynchronizerを使っている。

concurrent/impl/Promise.scala
private final class CompletionLatch[T] extends AbstractQueuedSynchronizer with (Try[T] => Unit) {
      override protected def tryAcquireShared(ignored: Int): Int = if (getState != 0) 1 else -1
      override protected def tryReleaseShared(ignore: Int): Boolean = {
        setState(1)
        true
      }
      override def apply(ignored: Try[T]): Unit = releaseShared(1)
    }

結構カオスなコードだと思うけど、自分で書けと言われてもきれいに書ける自信ない。やはりscalazのFutureは読み見やすいのでいいなぁと思った。

おまけ

こっちにも書いてますが、スレッドが全部使われているとブロッキングするので予期せずタイムアウトしてしまうので注意。

// コア数は4つなので、デフォルトで作成可能なスレッドは4つ。
scala> Runtime.getRuntime.availableProcessors
res154: Int = 4

// インポート
scala> import scala.concurrent._, duration._, ExecutionContext.Implicits.global
import scala.concurrent._
import duration._
import ExecutionContext.Implicits.global

// 5秒待つ関数定義
scala> def sleep = { Thread.sleep(5000); println("done") }
sleep: Unit

// Future4つででスレッド4つスリープしてる状態で、Awaitすると実行可能なスレッドがないのでタスクが待ちになる。
// しかしAwait自体のカウントダウンは進むので、何もしてないFutureでもタイムアウトになる。
scala> Future(sleep); Future(sleep); Future(sleep); Future(sleep); Await.result(Future(1), 1.seconds)
java.util.concurrent.TimeoutException: Futures timed out after [1 second]
  at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
  at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
  at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:116)
  at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
  at scala.concurrent.Await$.result(package.scala:116)
  ... 33 elided

scala> done
done
done
done

// 3つだけなら、実行可能なスレッドが1つ開いてるのでうまく動く。
scala> Future(sleep); Future(sleep); Future(sleep); Await.result(Future(1), 1.seconds)
res156: Int = 1

scala> done
done
done

// 追記:blockingを使うと、いけますが上限のスレッド数を超えて動くので注意。
scala> def sleep = blocking { Thread.sleep(5000); println("done") }
sleep: Unit

scala> Future(sleep); Future(sleep); Future(sleep); Future(sleep); Await.result(Future(1), 1.seconds)
res0: Int = 1

scala> done
done
done
done

ブロッキングに気づかないで、あきらかに軽い処理なのにタイムアウトする...みたいなことが起きる。起きた。

ふむふむふむ

11
8
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
11
8