概要
PlayFrameworkやAkka HTTPをつかっていて、うまくパフォーマンスがでなかった経験ありませんか?
もしかしたらそれ、スレッドプールをうまく分割できていなかったからかもしれません。
PlayFrameworkやAkka HTTPなど、ノンブロッキングI/Oを前提につくられたライブラリやフレームワークでは、ブロッキングI/Oに十分な注意を払う必要があります。
具体的に言うと、ノンブロッキングI/Oの処理のみを実行するスレッドプールと、ブロッキングI/O(I/O待ちによりスレッドを占有する)を含む処理を実行するスレッドプールは分割し、それぞれ適切に設定する必要があります。
なぜスレッドプールをわけるか
ノンブロッキングI/Oはその特性上、I/O待機中にCPUやスレッドを専有せず、他の処理に割り当てることができます。
この特性を活かし、PlayFrameworkやAkka HTTPなどのノンブロッキングI/Oでリクエストを受け付けるWebフレームワークは、大量の同時接続を少ないスレッドで効率的に処理することができます。
ここにブロッキングI/Oを含む処理を同じスレッドプールのスレッドで実行すると、I/O待ちなスレッドがじわじわと増えていき、ともなってスレッド割当待ちの処理が増加し、全体的に動作が悪くなってしまいます。
(最初は調子よくさばいているのにじわじわレスポンスタイムが増えていって...などはこの症状かもしれません。)
Akkaのコアコミッターである、@ktosopl (Konrad Malawski) さんもScalaMatsuri2016の「The Zen of Akka」で言及されています。
http://www.slideshare.net/ktoso/zen-of-akka#39
PlayFrameworkやAkka HTTPでのデフォルトのスレッドプールは、ノンブロッキングI/O用のものになっているので、たとえばPlayFrameworkのAction内でブロッキングI/O処理を意識せずに書いてしまい、上記のような状態になっているかもしれません。
たいていのWebアプリケーションはデータの永続化層にRDBを用いると思います。
しかしJavaやScalaで一般的に利用されるJDBCはインターフェイスが同期的なシグネチャになっており、内部実装もそれに合わせてブロッキングI/Oとなっており、特に注意が必要です。
(SlickなどのFutureを返すライブラリでも、内部で利用しているJDBCがブロッキングI/Oなので、そのFutureを実行するスレッドに気をつける必要があります。)
どのような処理がブロッキング処理(スレッドをブロックする処理)になるのか
java.lang.Thread#sleep()
-
scala.concurrent.Await.result()
- Futureをユーザープログラム内でAwaitするのはやめたほうが極力さけたほうがよいです
- 主要なWebフレームワークはFutureのまま結果を扱えるようになっています
synchronized
-
java.io
パッケージでのIO処理 - 多くのJDBC実装
どのようにスレッドプールをわけるか
スレッドプールを分割するには、
- スレッドプールを生成する
- 生成したスレッドプールの実行コンテキストを利用する
上記で可能です。
スレッドプールの生成
スレッドプールを簡単に作成する方法は、Akka
のdispatcher
(ExecutionContextExecutor
)を利用する方法です。
application.conf
に定義するだけで、設定したスレッドプールが割り当てられたdispatcher
を、ActorSystem
から取得できます。
(参考) Akka Dispatchers
blocking-io-dispatcher {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
fixed-pool-size = 32
}
throughput = 1
}
non-blocking-io-dispatcher {
type = Dispatcher
executor = "fork-join-executor"
fork-join-executor {
parallelism-min = 2
parallelism-factor = 2.0
parallelism-max = 10
}
throughput = 100
}
val system = ActorSystem()
// dispatcher(ExecutionContextExecutor)の取得
// ExecutionContextExecutorはExecutionContextを継承している
val defaultDispatcher = system.dispatcher // akka-actor default dispatcher
val blockingDispatcher = system.dispatchers.lookup("blocking-io-dispatcher")
val nonBlockingDispatcher = system.dispatchers.lookup("non-blocking-io-dispatcher")
なお、PlayFrameworkの場合は、play.api.libs.concurrent.Akka
というobject
が、PlayFramework内部で利用しているActorSystem
を提供しています。
import play.api.libs.concurrent.Akka
val defaultDispatcher = Akka.system.dispatcher // akka-actor default dispatcher
val blockingDispatcher = Akka.system.dispatchers.lookup("blocking-io-dispatcher")
val nonBlockingDispatcher = Akka.system.dispatchers.lookup("non-blocking-io-dispatcher")
この記事では言及しませんが、ブロッキングI/O、ノンブロッキングI/Oそれぞれ、下記のスレッドプールを扱うほうが良いようです。
- ノンブロッキングI/Oを扱うスレッドプール => ForkJoinPool
- ブロンキングI/Oを扱うスレッドプール => ThreadPoolExecutor
Futureの実行スレッドを変更する
取得したdispatcher
(ExectionContext
)を使って、apply
やmap
、flatMap
を行います。
val future = Future {
User.findById(userId) // jdbc access
}(blockingDispatcher).map { user =>
s"Hello ${user.name}!"
}(nonBlockingDispatcher)
akka-actorの実行スレッドを変更する
Actor
を生成する際に、利用するdispatcher
を指定できます。
(指定がない場合は、default-dispatcher
が利用されます。)
val actor = context.actorOf(Props[UseJdbcActor].withDispatcher("blocking-io-dispatcher"))
akka-streamの実行スレッドを変更する
Graph
(Source
, Flow
, Sink
の基底)のAttribute
を変更して、利用するdispatcher
を指定できます。
val blockingFlow = Flow[Long].map { id =>
User.findById(userId) // jdbc access
}.addAttributes(ActorAttributes.dispatcher("blocking-io-dispatcher"))
まとめ
- ブロッキングIOを含む処理のスレッドプールとノンブロッキングな処理のスレッドプールはわける
-
ActorSystem
やAkka.system
のdispatchers.lookup
をつかえば楽に任意のスレッドプールを生成し、利用できる