Scala

ScalaとFuture その3 ~globalのExecutionContextソースコードリーディング~

More than 1 year has passed since last update.

ScalaとFuture その3

  • その1
  • その2
  • FutureRunnableになってExecuteContextで実行される
  • いつも適当にimportしているscala.concurrent.ExecutionContext.Implicits.globalはどうやってFutureを実行するんだろう?
  • やっていくこと -> 1つの例としてscala.concurrent.ExecutionContext.Implicits.globalのExecutionContextがFutureを実行するコードをみていく
    • Futureが失敗したときのアレソレは意図的にスルーしていく

ExecutionContextのおさらい

  • ソースコード
  • Futureを実行する。traitで定義されている
    • executereportFailureくらいしかない
  • executeはRunnableを取る
trait ExecutionContext {
  def execute(runnable: Runnable): Unit

  def reportFailure(cause: Throwable): Unit
}
  • ExecutionContextそのものはRunnableの実行の方法を定めていないんだね。ExecutionContextによってはThreadが新しく作られることも、処理が非同期じゃないこともある。
  • つまり、Futureである = 非同期である ではないんだね
  • また、Futureである = 別のThreadで実行される でもないんだね

globalのExecutionContextのソースコードから読み解く

  • いつもインポートしてるヤツ
  • impl.ExecutionContextImpl.fromExecutornullを渡した返り値がいつも使ってるExecutionContextのようだね
  • lazyで宣言されているから一度呼び出されると使い回されるね
  object Implicits {
    implicit lazy val global: ExecutionContext = impl.ExecutionContextImpl.fromExecutor(null: Executor)
  }

fromExecutor

  def fromExecutor(e: Executor, reporter: Throwable => Unit = ExecutionContext.defaultReporter): ExecutionContextImpl =
    new ExecutionContextImpl(Option(e).getOrElse(createDefaultExecutorService(reporter)), reporter)
  • Option(e).getOrElse(createDefaultExecutorService(reporter))を見るとglobal.executionContextではcreateDefaultExecutorServiceから返るExecutorServiceを使ったExecutionContextImplが生成されるのがわかる

ExecutionContextImpl

  • ExecutionContextとExecutorを継承したトレイト
  • executeの実装はコンストラクタにとったExecutorに処理を移譲するようになっている
private[scala] class ExecutionContextImpl private[impl] (val executor: Executor, val reporter: Throwable => Unit) extends ExecutionContextExecutor {
  override def execute(runnable: Runnable) = executor execute runnable
}

createDefaultExecutorService

  • global.executionContextが処理を移譲するExecutorServiceを返す関数
  • ソースコードを見るとForkJoinPoolexecuteを拡張したものを生成して返しているのがわかる
    new ForkJoinPool(desiredParallelism, threadFactory, uncaughtExceptionHandler, true) {
      override def execute(runnable: Runnable): Unit = {
        val fjt: ForkJoinTask[_] = runnable match {
          case t: ForkJoinTask[_] => t
          case r                  => new ExecutionContextImpl.AdaptedForkJoinTask(r)
        }
        Thread.currentThread match {
          case fjw: ForkJoinWorkerThread if fjw.getPool eq this => fjt.fork()
          case _                                                => super.execute(fjt)
        }
      }
    }
  • ForkJoinPoolを生成する際に引数で渡しているdesiredParallelism, threadFactoryと第4引数のtrueが実行に関連しそうだ
  • overrideされたexecuteを見ると引数にとったRunnableの型とexecuteが実行されているThreadによって処理が変わっている

    • 引数のRunnable(以下r)がForkJoinTaskでなかった場合コンストラクタにrをわたしてAdaptedForkJoinTaskを生成している
    • 次の処理でexecuteが実行されているThreadがForkJoinThreadで実行されているThreadと同一のThreadだった場合、ForkJoinTaskforkしている
    • もし、ForkJoinThreadでないもしくはForkJoinThreadだが、現在実行されているThreadとは別のThreadだった場合にはForkJoinPoolexecuteForkJoinTaskを渡している。
  • ↑なるほどわからん

  • わかったことは

    • ForkJoinPoolによってFutureは実行される
    • 引数でわたってきたRunnableはForkJoinTaskではなかった場合にForkJoinTaskにラップされる
    • 実行時のThreadがForkJoinPoolだったらTaskはforkされる

ForkJoinPool

  • 複数のタスクを無駄なくスレッドに割り当てるThreadPoolの実装
  • forkによってタスクの処理が中断されたときには、他のスレッドに割り当てられている実行待ちのタスクを処理しだすことが、joinによってできる(Work-stealing)
    • 勝手なイメージだが、コンビニのレジみたいな感じ。温め待ちのときもレジの人は行列をさばき、隣のレジにある行列から「次のお客様どうぞー」だったりと積極的に仕事をする

fork(分割)

  • Task(A)がTask(B)を生み出して処理を分割する
  • forkしてjoinする場合にはTask(B)を生み出したTask(A)はBの終了まで中断され、Aを実行していたThreadはまた別のTaskをさばく

join

  • TaskからforkされたTaskをjoinすることで、forkされたjoinの結果をfork元は待つことができる
    • ThreadはforkされたTaskの結果を待つことなく、中断としてまた別のタスクを実行する

しかし

  • globalのExecutionContextを読んでいてjoinしている様子はない

まとめ

  • scala.concurrent.ExecutionContext.Implicits.globalExecutionContextは、実行ThreadがForkJoinTaskだった場合にはforkによって処理は実行される。
  • joinをしない、ForkJoinPoolによって実行されてるようだ。