Help us understand the problem. What is going on with this article?

Scala で Future 使用時にとりあえず書いとく import ExecutionContext.Implicits.global から紐解く ExecutionContext の使い方

More than 3 years have passed since last update.

概要

scala で Future を使用する際に、なんとなく以下を記述している人もいるのかなと思います。

import ExecutionContext.Implicits.global

これを書いていないと、以下のようなコンパイルエラーになります。

Cannot find an implicit ExecutionContext.

この記事では ExecutionContext.Implicits.global について簡単な解説をします。
その後で、ExecutionContext の使い方について考察します。

ExecutionContext とは

ExecutionContext とは、簡単に言ってしまうと、タスクを非同期で実行する仕組みのインタフェースです。trait の定義は以下ですが、重要なのは execute メソッドです。

scala.concurrent.ExecutionContext.scala
trait ExecutionContext {
  def execute(runnable: Runnable): Unit
  def reportFailure(@deprecatedName('t) cause: Throwable): Unit
  def prepare(): ExecutionContext = this
}

つまり、非同期で実行したい処理を Runnable にして ExecutionContext.execute メソッドに渡すと、後は良しなに実行してくれる、というわけです。

※と言われてもピンと来ない方もいるかもしれませんが、非同期処理をスレッドを使って自前で実装すると、スレッド数や実行順序の管理など行う必要があってちょっと面倒です。そこを良しなにやってくれるので助かるわけです。

Future は引数のブロックを Runnable でラップして ExecutionContext.execute メソッドに渡しています。そのため、ExecutionContext の実装を指定する必要があるのです。
※詳しくはこちら

ExecutionContext の実装には様々なものがあって、極論を言うと同期処理するように実装することも可能です。

ExecutionContext.Implicits.global の実装

ForkJoin フレームワークというものになっています。

ForkJoin フレームワークでは、指定された数のワーカースレッドを保持しておいて、execute メソッドに渡されたタスクをいずれかのワーカースレッドに処理させます。タスクはワーカースレッド別のキューで管理されるのですが、一部のワーカーが多忙で他が暇になっている、などということはありません。これは work-stealing という、暇になったワーカーが他のワーカーのキューに入っているタスクを盗んで処理する仕組み、を持っているためです。つまり、ワーカースレッドをプールしておくことでスレッド生成のオーバーヘッドをなくし、かつ、各スレッドを遊ばせておかない仕組みになっているのです。

scala のバージョンと ForkJoin の実装

ForkJoinPool は java7 で導入されました。最新の scala 2.12 は jvm8 以上対応なので、ForkJoinPool は jvm のものを利用しているそうです。
scala 2.11 までは jvm6 に対応しているので、ForkJoinPool やそれに付随するクラスは scala プロジェクトで別に実装されています。(jvm の実装を基にしているっぽいです)

以下では scala 2.11.x を用います。

自前の ExecutionContext を実装してみる

以下のサンプルコードで、ExecutionContext を変えて試してみることにします。
Future で 100ms 後に "future complete" と出力するだけのものです。

ExecutorSample.scala
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, ExecutionContext, Future}

object ExecutorSample {
  def main(args: Array[String]){
    import ExecutionContext.Implicits.global

    Future {
      Thread.sleep(100)
      println("future complete: thread=" + Thread.currentThread().getId)
    }

    println("main sleep: thread=" + Thread.currentThread().getId)
    Await.ready(f, Duration.Inf)
  }
}

上ではデフォルトの ExecutionContext.Implicits.global を使用しているので、以下のような結果になります。

main sleep: thread=1
future complete: thread=10

future complete が遅れて出力されていますし、別スレッドで処理されていることが分かります。

タスクを同期処理する実装

タスクを呼び出し元のスレッドで処理する ExecutionContext を実装します。

executor.SyncExecutor.scala
package executor

import scala.concurrent.ExecutionContext

class SyncExecutor extends ExecutionContext{
  override def execute(runnable: Runnable): Unit = runnable.run

  override def reportFailure(cause: Throwable): Unit = cause.printStackTrace
}

object SyncExecutor {
  implicit val global = SyncExecutor()

  def apply() = {
    new SyncExecutor()
  }
}

先ほどの例で ExecutionContext を import している箇所を以下のように変更します。

import executor.SyncExecutor.global

実行すると、結果は以下になります。

future complete: thread=1
main sleep: thread=1

future が main と同じスレッドで実行されていますね。

タスクを非同期処理する実装

タスクを呼び出し元とは別のスレッドで処理する ExecutionContext を実装します。

executor.AsyncExecutor.scala
package executor

import scala.concurrent.ExecutionContext

class AsyncExecutor extends ExecutionContext{
  override def execute(runnable: Runnable): Unit = new Thread(runnable).start

  override def reportFailure(cause: Throwable): Unit = cause.printStackTrace
}

object AsyncExecutor {
  implicit val global = AsyncExecutor()

  def apply() = {
    new AsyncExecutor()
  }
}

先ほどの例で ExecutionContext を import している箇所を以下のように変更します。

import executor.AsyncExecutor.global

実行すると、結果は以下になります。

main sleep: thread=1
future complete: thread=10

future が別スレッドで実行されました。

デフォルトの ExecutionContext の効果的な使い方

上で述べたように、ExecutionContext を自前で実装することができますが、殆どの場合はデフォルトの ExecutionContext を使用した方がいいでしょう。

ということで、効果的な使い方について解説します。

デフォルトの ForkJoinPool の設定

ワーカースレッドの数は以下で決定されます。

scala.concurrent.forkjoin.ForkJoinPool.java
    val desiredParallelism = range(
      getInt("scala.concurrent.context.minThreads", "1"),
      getInt("scala.concurrent.context.numThreads", "x1"),
      getInt("scala.concurrent.context.maxThreads", "x1"))

getInt の第二引数はデフォルト値で、"x1" はコア数を意味しています。つまり、デフォルトではコア数=スレッド数になります。

デフォルト値で使用してみる

手元のマシンは4コアなので、4スレッドで実行されるはずです。

以下のプログラムでは、100個の Future を作成しています。各Future は 100ms sleep します。
※スレッド生成にかかる時間を除くため、exec メソッドを二度実行しています。

ExecutorSample2.scala
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, ExecutionContext, Future}

object ExecutorSample2 {
  def main(args: Array[String]){
    import ExecutionContext.Implicits.global

    System.setProperty("scala.concurrent.context.numThreads", "x1")
    System.setProperty("scala.concurrent.context.maxThreads", "x1")

    exec(true)
    exec(false)
  }

  def exec(isDummy:Boolean)(implicit executor: ExecutionContext) = {
    val start = System.currentTimeMillis()
    val f: Future[List[Int]] = Future.traverse((1 to 100).toList) { i =>
      Future {
        Thread.sleep(100)
        //println("id=" + Thread.currentThread().getId)
        i
      }
    }

    Await.ready(f, Duration.Inf)
    if(!isDummy)println("elapsed time=" + (System.currentTimeMillis() - start))
  }
}

2505ms かかりました。4スレッドなので、1スレッド平均 25個の Future を処理するので、妥当ですね。

ワーカースレッドを増やしてみる

コア数×10のワーカースレッドを作成するようにしてみます。先ほどのコードに以下を追記します。

    System.setProperty("scala.concurrent.context.numThreads", "x10")
    System.setProperty("scala.concurrent.context.maxThreads", "x10")

304ms かかりました。40スレッドなので、1スレッド平均 2.5個(つまり多いもので3個)の Future を処理するので、妥当です。

コア数x25 に増やしてみると、113ms かかりました。これ以上はスレッド数を増やしても、処理時間は変わりません。(当たり前ですが)

というわけでして、本例のような単純(各処理が完全に独立しているため競合が発生しない)な場合は、タスク数=ワーカースレッド数とするのが最適かなと思います。

複数の ExecutionContext を使ってみる

ExecutionContext は複数使用することができます。
例えばですが、ウェブアプリケーションにおいて、ユーザからのリクエストを処理する Context と、外部の WEBAPI にアクセスする Context を別にする、などが考えられると思います。そうしておけば、WEBAPI に何らかの問題が発生してレスポンスが遅くなった場合でも、多くのスレッドが WEBAPI 待ちになるのを防ぐことができ、WEBAPI を必要としないリクエストについては遅延なく処理することが可能になります。

上記を想定して、以下のケースで実験します。
二種類のタスクがあるとします。

  • 軽くて(処理時間=100ms)、発生頻度が高いタスク
  • 重くて(処理時間=5000ms)、発生頻度が低いタスク

この二つのタスクを

  • 同じ ExecutionContext で処理する
  • タスク種類別に異なる ExecutionContext で処理する

の二つの場合で、処理時間がどのように変わるかを比較します。

同じ ExecutionContext で処理する場合

コア数(私の環境では4)×2 = 8 スレッドの ExecutionContext を一つ用意します。
100ms ごとに軽いタスクを一つ発生させます。
2000ms ごとに重いタスクを一つ発生させます。
これを 100タスク分行います。

これだけだと、スレッド数に対して処理負荷が少ないため、発生したタスクは瞬時にワーカースレッドに拾われて処理されます。しかし、瞬間的に重いタスクが多数発生したらどうなるでしょうか。プログラム実行から約 5000ms 後に、重いタスクを 5個発生させることにします。こうしておくと、重いタスクに 8スレッド全てが占有される時間帯が発生し、軽いタスクの処理が遅延します。

以下、サンプルコードです。

ExecutorSampleSame.scala
import scala.collection.mutable
import scala.concurrent._

object ExecutorSampleSame {

  val lightResults = new mutable.MutableList[Long]
  val heavyResults = new mutable.MutableList[Long]

  System.setProperty("scala.concurrent.context.numThreads", "x2")
  System.setProperty("scala.concurrent.context.maxThreads", "x2")
  val commonExecutionContext = ExecutionContext.fromExecutor(null)


  def main(args: Array[String]){
    (1 to 100).toList.map { i =>
      if(i % 20 == 0) heavy(false) else light(false)
      if(i == 50){
        (1 to 5).toList.map{i=>heavy(false)}
      }
      Thread.sleep(100)
    }

    Thread.sleep(10 * 1000)

    val lightAvg = lightResults.sum / lightResults.size
    println ("lightAvg=" + lightAvg)
  }

  def light(isDummy:Boolean) = {
    implicit val executionContext:ExecutionContext = commonExecutionContext
    val start = System.currentTimeMillis()
    Future {
      Thread.sleep(100)
      val elapsedTime = System.currentTimeMillis() - start
      lightResults += elapsedTime
    }
  }

  def heavy(isDummy:Boolean) = {
    implicit val executionContext:ExecutionContext = commonExecutionContext
    val start = System.currentTimeMillis()
    Future {
      Thread.sleep(5000)
      val elapsedTime = System.currentTimeMillis() - start
      heavyResults += elapsedTime
    }
  }
}

出力結果は以下です。

lightAvg=422

軽いタスクの処理時間の平均が 422ms になりました。
通常であれば 100ms で処理されるわけですから、由々しき事態です。

タスクの種類別の ExecutionContext で処理する場合

タスクの種類別に ExecutionContext を分けてみます。
上の例と合計スレッド数を同じにするために、各 Context のスレッド数を 4 にします。

以下、サンプルコードです。

ExecutorSampleAnother.scala
import scala.collection.mutable
import scala.concurrent._

object ExecutorSampleAnother {

  val lightResults = new mutable.MutableList[Long]
  val heavyResults = new mutable.MutableList[Long]

  System.setProperty("scala.concurrent.context.numThreads", "x1")
  System.setProperty("scala.concurrent.context.maxThreads", "x1")
  val lightExecutionContext = ExecutionContext.fromExecutor(null)
  val heavyExecutionContext = ExecutionContext.fromExecutor(null)


  def main(args: Array[String]){
    (1 to 100).toList.map { i =>
      if(i % 20 == 0) heavy(false) else light(false)
      if(i == 50){
        (1 to 5).toList.map{i=>heavy(false)}
      }
      Thread.sleep(100)
    }

    Thread.sleep(10 * 1000)

    val lightAvg = lightResults.sum / lightResults.size
    println ("lightAvg=" + lightAvg)
  }

  def light(isDummy:Boolean) = {
    implicit val executionContext:ExecutionContext = lightExecutionContext
    val start = System.currentTimeMillis()
    Future {
      Thread.sleep(100)
      val elapsedTime = System.currentTimeMillis() - start
      lightResults += elapsedTime
    }
  }

  def heavy(isDummy:Boolean) = {
    implicit val executionContext:ExecutionContext = heavyExecutionContext
    val start = System.currentTimeMillis()
    Future {
      Thread.sleep(5000)
      val elapsedTime = System.currentTimeMillis() - start
      heavyResults += elapsedTime
    }
  }
}

実行結果は以下です。

lightAvg=100

軽いタスクは遅延なく実行されました。

まとめ

もう ExecutionContext は怖くない・・・かも。

qri
金融工学とITを融合し、新たな価値を創造する専門会社
http://www.quantsresearch.com
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away