概要
PlayではNon-BlockingIOを押しているので、処理を非同期(Future)ベースで書くのが一般的かと思います。
しかし、とりあえずimport scala.concurrent.ExecutionContext.Implicits.global
とかやるとリソースを活かしきれないケースがあるとか、逆にpoolSizeを大きくするとスレッドのスイッチングコストがかかるとか色々複雑なことが言われています。
ここでは、そこまで深くは触れませんが、複数のthreadPoolを使い分けれれうようにしようと思います。
(JVMのスレッドの仕組みに詳しい方がいたらぜひ教えて下さい。)
環境
- Play 2.5
ソースコードはこちら。
ThreadPoolにどんな種類があるのか?
どうやら、default thread poolは、ActorSystemのdefault dispatcherから取得しているようです。
また、ここでいうdefault thread poolは、
play.api.libs.concurrent.Execution.Implicits._
で読み込まれるもののようです。
複数のthread pool を使いたい場合は、以下のように別のDispatcherを用意してあげればいいようです。
my-context {
fork-join-executor {
parallelism-factor = 20.0
parallelism-max = 200
}
}
ActorSystemのDispatcherとは何か?
my-dispatcher {
# Dispatcher is the name of the event-based dispatcher
type = Dispatcher
# What kind of ExecutionService to use
executor = "fork-join-executor"
# Configuration for the fork join pool
fork-join-executor {
# Min number of threads to cap factor-based parallelism number to
parallelism-min = 2
# Parallelism (threads) ... ceil(available processors * factor)
parallelism-factor = 2.0
# Max number of threads to cap factor-based parallelism number to
parallelism-max = 10
}
# Throughput defines the maximum number of messages to be
# processed per actor before the thread jumps to the next actor.
# Set to 1 for as fair as possible.
throughput = 100
}
executorのタイプがfork-join-executor前提で、Akkaを使わない場合であれば、
- parallelism-min
- 同時実行の最小数
- parallelism-factor
- CPU coreあたりの同時実行数
- parallelism-max
- 同時実行の最大数
を見れば良さそうです。
Action.asyncを実装する
def asyncAction = Action.async {
import play.api.libs.concurrent.Execution.Implicits._
Future{"hello"}.map(Ok(_))
}
このように、Action.asyncを使って、返り値をFurue[Result]
にすればOK
ThreadPoolの挙動を見てみる
試しに、Futureで1秒待機させる処理を10個間に挟んでみます。
並列度が高いほどたくさん処理してくれるはずです。
また、ここではCore数4のPCで試してみます。
@Singleton
class AsyncController @Inject() (actorSystem: ActorSystem, context: ExecutionContext) extends Controller {
def action1 = Action.async {
//ExecutionContextを読み込む
execution()
}
def execution()(implicit ec: ExecutionContext): Future[Result] ={
val fList = (1 to 10).map(_ => Future{Thread.sleep(1000);println(Thread.currentThread.getName + ": " + DateTime.now)})
Await.result(Future.sequence(fList), Duration.Inf)
println("finish")
Future{"hello"}.map(Ok(_))
}
akka {
actor-system = "myActorSystem"
myDispatcher {
fork-join-executor {
# Min number of threads to cap factor-based parallelism number to
parallelism-min = 1
# Parallelism (threads) ... ceil(available processors * factor)
parallelism-factor = 1.0
# Max number of threads to cap factor-based parallelism number to
parallelism-max = 100
}
}
myDispatcher2 {
fork-join-executor {
# Min number of threads to cap factor-based parallelism number to
parallelism-min = 1
# Parallelism (threads) ... ceil(available processors * factor)
parallelism-factor = 2.0
# Max number of threads to cap factor-based parallelism number to
parallelism-max = 100
}
}
actor {
default-dispatcher {
fork-join-executor {
# Min number of threads to cap factor-based parallelism number to
parallelism-min = 2
# Parallelism (threads) ... ceil(available processors * factor)
parallelism-factor = 3.0
# Max number of threads to cap factor-based parallelism number to
parallelism-max = 100
}
}
}
}
play.api.libs.concurrent.Execution.Implicits._
application-akka.actor.default-dispatcher-9: 2016-08-12T15:42:05.391+09:00
application-akka.actor.default-dispatcher-8: 2016-08-12T15:42:05.391+09:00
application-akka.actor.default-dispatcher-4: 2016-08-12T15:42:05.391+09:00
application-akka.actor.default-dispatcher-5: 2016-08-12T15:42:05.392+09:00
application-akka.actor.default-dispatcher-10: 2016-08-12T15:42:05.391+09:00
application-akka.actor.default-dispatcher-7: 2016-08-12T15:42:05.391+09:00
application-akka.actor.default-dispatcher-12: 2016-08-12T15:42:05.392+09:00
application-akka.actor.default-dispatcher-11: 2016-08-12T15:42:05.392+09:00
application-akka.actor.default-dispatcher-2: 2016-08-12T15:42:05.391+09:00
application-akka.actor.default-dispatcher-3: 2016-08-12T15:42:05.391+09:00
default-dispatcherは、
core 4 * 並列度3.0 = 12なので一秒以内に全ての処理を行っています。
ちなみに、ここでDIされている
actorSystem.dispatcher
context: ExecutionContext
も同じ結果になりました。いずれもDefaultのDispatcherを使っているようです。
actorSystem.dispatchers.lookup("myDispatcher")
application-myDispatcher-39: 2016-08-12T15:47:36.442+09:00
application-myDispatcher-40: 2016-08-12T15:47:36.445+09:00
application-myDispatcher-38: 2016-08-12T15:47:36.445+09:00
application-myDispatcher-42: 2016-08-12T15:47:36.445+09:00
application-myDispatcher-39: 2016-08-12T15:47:37.442+09:00
application-myDispatcher-40: 2016-08-12T15:47:37.445+09:00
application-myDispatcher-38: 2016-08-12T15:47:37.445+09:00
application-myDispatcher-42: 2016-08-12T15:47:37.445+09:00
application-myDispatcher-39: 2016-08-12T15:47:38.443+09:00
application-myDispatcher-40: 2016-08-12T15:47:38.445+09:00
myDispatcherはcore当たりの並列度が1なので、4スレッドで走ります。
threadの番号を見ても、4つしか出てきていないのが分かるかと思います。
ちなみに、このThreadPoolと全く同じ設定ではないですが、scala.concurrent.ExecutionContext.Implicits.global
もcore数分の並列度を持つため、今回は4つずつ処理されます。
actorSystem.dispatchers.lookup("myDispatcher2")
application-myDispatcher2-30: 2016-08-12T15:46:17.673+09:00
application-myDispatcher2-32: 2016-08-12T15:46:17.673+09:00
application-myDispatcher2-31: 2016-08-12T15:46:17.673+09:00
application-myDispatcher2-34: 2016-08-12T15:46:17.674+09:00
application-myDispatcher2-35: 2016-08-12T15:46:17.674+09:00
application-myDispatcher2-33: 2016-08-12T15:46:17.675+09:00
application-myDispatcher2-36: 2016-08-12T15:46:17.675+09:00
application-myDispatcher2-37: 2016-08-12T15:46:17.675+09:00
application-myDispatcher2-30: 2016-08-12T15:46:18.673+09:00
application-myDispatcher2-32: 2016-08-12T15:46:18.673+09:00
myDispatcher2はcore当たりの並列度が2なので、8スレッドで走ります。
まとめ
Playでは非同期処理を中心に組み立てていくとblockせずに効率の良いアプリケーションがかけるので、とりあえずasync使っていきましょう。
(詳しい方がいれば、ThreadPoolもチューニングすると更に性能が良くなりそうです)