ScalaとFuture その3
- その1
- その2
-
Future
はRunnable
になってExecuteContext
で実行される - いつも適当にimportしている
scala.concurrent.ExecutionContext.Implicits.global
はどうやってFutureを実行するんだろう? - やっていくこと -> 1つの例として
scala.concurrent.ExecutionContext.Implicits.global
のExecutionContextがFutureを実行するコードをみていく- Futureが失敗したときのアレソレは意図的にスルーしていく
ExecutionContextのおさらい
- ソースコード
- Futureを実行する。traitで定義されている
-
execute
とreportFailure
くらいしかない
-
- executeは
Runnable
を取る
trait ExecutionContext {
def execute(runnable: Runnable): Unit
def reportFailure(cause: Throwable): Unit
}
- ExecutionContextそのものはRunnableの実行の方法を定めていないんだね。ExecutionContextによってはThreadが新しく作られることも、処理が非同期じゃないこともある。
- つまり、Futureである = 非同期である ではないんだね
- また、Futureである = 別のThreadで実行される でもないんだね
globalのExecutionContextのソースコードから読み解く
- いつもインポートしてるヤツ
-
impl.ExecutionContextImpl.fromExecutor
にnull
を渡した返り値がいつも使ってるExecutionContext
のようだね -
lazy
で宣言されているから一度呼び出されると使い回されるね
object Implicits {
implicit lazy val global: ExecutionContext = impl.ExecutionContextImpl.fromExecutor(null: Executor)
}
fromExecutor
-
fromExecutorの定義を見ると
ExecutionContextImpl
を生成しているようだ。
def fromExecutor(e: Executor, reporter: Throwable => Unit = ExecutionContext.defaultReporter): ExecutionContextImpl =
new ExecutionContextImpl(Option(e).getOrElse(createDefaultExecutorService(reporter)), reporter)
-
Option(e).getOrElse(createDefaultExecutorService(reporter))
を見るとglobal.executionContext
ではcreateDefaultExecutorService
から返るExecutorService
を使ったExecutionContextImpl
が生成されるのがわかる
ExecutionContextImpl
- ExecutionContextとExecutorを継承したトレイト
-
execute
の実装はコンストラクタにとったExecutor
に処理を移譲するようになっている
private[scala] class ExecutionContextImpl private[impl] (val executor: Executor, val reporter: Throwable => Unit) extends ExecutionContextExecutor {
override def execute(runnable: Runnable) = executor execute runnable
}
createDefaultExecutorService
-
global.executionContext
が処理を移譲するExecutorServiceを返す関数 -
ソースコードを見ると
ForkJoinPool
のexecute
を拡張したものを生成して返しているのがわかる
new ForkJoinPool(desiredParallelism, threadFactory, uncaughtExceptionHandler, true) {
override def execute(runnable: Runnable): Unit = {
val fjt: ForkJoinTask[_] = runnable match {
case t: ForkJoinTask[_] => t
case r => new ExecutionContextImpl.AdaptedForkJoinTask(r)
}
Thread.currentThread match {
case fjw: ForkJoinWorkerThread if fjw.getPool eq this => fjt.fork()
case _ => super.execute(fjt)
}
}
}
-
ForkJoinPool
を生成する際に引数で渡しているdesiredParallelism
,threadFactory
と第4引数のtrueが実行に関連しそうだ -
overrideされた
execute
を見ると引数にとったRunnableの型とexecute
が実行されているThreadによって処理が変わっている- 引数のRunnable(以下r)が
ForkJoinTask
でなかった場合コンストラクタにrをわたしてAdaptedForkJoinTask
を生成している - 次の処理で
execute
が実行されているThreadがForkJoinThread
で実行されているThreadと同一のThread
だった場合、ForkJoinTask
をfork
している - もし、
ForkJoinThread
でないもしくはForkJoinThread
だが、現在実行されているThread
とは別のThread
だった場合にはForkJoinPool
のexecute
にForkJoinTask
を渡している。
- 引数のRunnable(以下r)が
↑なるほどわからん
-
わかったことは
- ForkJoinPoolによってFutureは実行される
- 引数でわたってきたRunnableはForkJoinTaskではなかった場合にForkJoinTaskにラップされる
- 実行時のThreadがForkJoinPoolだったらTaskはforkされる
ForkJoinPool
- 複数のタスクを無駄なくスレッドに割り当てるThreadPoolの実装
- forkによってタスクの処理が中断されたときには、他のスレッドに割り当てられている実行待ちのタスクを処理しだすことが、joinによってできる(Work-stealing)
- 勝手なイメージだが、コンビニのレジみたいな感じ。温め待ちのときもレジの人は行列をさばき、隣のレジにある行列から「次のお客様どうぞー」だったりと積極的に仕事をする
fork(分割)
- Task(A)がTask(B)を生み出して処理を分割する
- forkしてjoinする場合にはTask(B)を生み出したTask(A)はBの終了まで中断され、Aを実行していたThreadはまた別のTaskをさばく
join
- TaskからforkされたTaskをjoinすることで、forkされたjoinの結果をfork元は待つことができる
- ThreadはforkされたTaskの結果を待つことなく、中断としてまた別のタスクを実行する
しかし
- globalのExecutionContextを読んでいてjoinしている様子はない
まとめ
-
scala.concurrent.ExecutionContext.Implicits.global
のExecutionContext
は、実行ThreadがForkJoinTaskだった場合にはforkによって処理は実行される。 -
join
をしない、ForkJoinPool
によって実行されてるようだ。