Finagle で FuturePool.unboundedPool
ではなく FixedThreadPool
を使った FuturePool
を使う為の方法を調べてみた。
環境
試した環境は以下。
name := "finagle-example"
scalaVersion := "2.11.7"
resolvers += "twitter-repo" at "https://maven.twttr.com"
libraryDependencies ++= Seq(
"com.twitter" %% "finagle-http" % "6.34.0"
)
FuturePool が無い場合
まずは FuturePool
無しで実行した場合の挙動を確認してみる。
以下のような簡単なサンプルを書いてみた。
import com.twitter.finagle.{http, Http, Service, SimpleFilter}
import com.twitter.util.{Await, Future}
import com.twitter.logging.Logger
class LoggingFilter extends SimpleFilter[http.Request, http.Response] {
private val log = Logger.get(getClass)
def apply(request: http.Request, service: Service[http.Request, http.Response]) = {
log.info(s"request start. thread_id: ${Thread.currentThread.getId}")
val f = service(request)
log.info(s"request end. thread_id: ${Thread.currentThread.getId}")
f
}
}
class HelloService extends Service[http.Request, http.Response] {
private val log = Logger.get(getClass)
override def apply(request: http.Request): Future[http.Response] = {
val response = http.Response()
val message = "Hello, Finagle."
log.info(s"message: ${message} thread_id: ${Thread.currentThread.getId}")
response.setContentString(s"${message}\n");
Future.value(response)
}
}
object HTTPServer {
private val log = Logger.get(getClass)
val loggingFilter = new LoggingFilter
val helloService = new HelloService
def main(args: Array[String]):Unit = {
val service = loggingFilter andThen helloService
val server = Http.serve(":8080", service)
Await.ready(server)
}
}
sbt run
で起動し、何回かリクエストを送信してみると、以下のようなログが出力された。
% sbt run
[info] Set current project to finagle-example (in build file:/home/akishin/src/scala/finagle_ex/)
[info] Compiling 1 Scala source to /home/akishin/src/scala/finagle_ex/target/scala-2.11/classes...
[info] Running HTTPServer
3 31, 2016 10:43:37 午後 com.twitter.finagle.Init$$anonfun$1 apply$mcV$sp
情報: Finagle version 6.34.0 (rev=44f444f606b10582c2da8d5770b7879ddd961211) built at 20160310-155158
3 31, 2016 10:44:38 午後 LoggingFilter apply
INFO: request start. thread_id: 22
3 31, 2016 10:44:38 午後 HelloService apply
INFO: message: Hello, Finagle. thread_id: 22
3 31, 2016 10:44:38 午後 LoggingFilter apply
INFO: request end. thread_id: 22
3 31, 2016 10:44:38 午後 LoggingFilter apply
INFO: request start. thread_id: 23
3 31, 2016 10:44:38 午後 HelloService apply
INFO: message: Hello, Finagle. thread_id: 23
3 31, 2016 10:44:38 午後 LoggingFilter apply
INFO: request end. thread_id: 23
3 31, 2016 10:44:38 午後 LoggingFilter apply
INFO: request start. thread_id: 24
3 31, 2016 10:44:38 午後 HelloService apply
INFO: message: Hello, Finagle. thread_id: 24
3 31, 2016 10:44:38 午後 LoggingFilter apply
INFO: request end. thread_id: 24
取り敢えずブロッキングするような処理は何も無いので、 thread_id を見るとリクエスト毎に同じスレッドで順番に実行されている事が判る。
FuturePool.unboundedPool
次に Service
を FuturePool.unboundedPool
で実行してみる。
コードは以下のようになった。
import com.twitter.finagle.{http, Http, Service, SimpleFilter}
import com.twitter.util.{Await, Future, FuturePool}
import com.twitter.logging.Logger
class LoggingFilter extends SimpleFilter[http.Request, http.Response] {
private val log = Logger.get(getClass)
def apply(request: http.Request, service: Service[http.Request, http.Response]) = {
log.info(s"request start. thread_id: ${Thread.currentThread.getId}")
val f = service(request)
log.info(s"request end. thread_id: ${Thread.currentThread.getId}")
f
}
}
class HelloService extends Service[http.Request, http.Response] {
private val log = Logger.get(getClass)
override def apply(request: http.Request): Future[http.Response] = FuturePool.unboundedPool {
val response = http.Response()
val message = "Hello, Finagle."
log.info(s"message: ${message} thread_id: ${Thread.currentThread.getId}")
response.setContentString(s"${message}\n");
response
}
}
object HTTPServer {
private val log = Logger.get(getClass)
val loggingFilter = new LoggingFilter
val helloService = new HelloService
def main(args: Array[String]):Unit = {
val service = loggingFilter andThen helloService
val server = Http.serve(":8080", service)
Await.ready(server)
}
}
sbt run
で起動し、同様にして何回かリクエストを送信してみる。
% sbt run
[info] Set current project to finagle-example (in build file:/home/akishin/src/scala/finagle_ex/)
[info] Compiling 1 Scala source to /home/akishin/src/scala/finagle_ex/target/scala-2.11/classes...
[info] Running HTTPServer
3 31, 2016 10:47:57 午後 com.twitter.finagle.Init$$anonfun$1 apply$mcV$sp
情報: Finagle version 6.34.0 (rev=44f444f606b10582c2da8d5770b7879ddd961211) built at 20160310-155158
3 31, 2016 10:48:28 午後 LoggingFilter apply
INFO: request start. thread_id: 21
3 31, 2016 10:48:28 午後 LoggingFilter apply
INFO: request end. thread_id: 21
3 31, 2016 10:48:28 午後 HelloService$$anonfun$apply$1 apply
INFO: message: Hello, Finagle. thread_id: 26
3 31, 2016 10:48:28 午後 LoggingFilter apply
INFO: request start. thread_id: 22
3 31, 2016 10:48:28 午後 LoggingFilter apply
INFO: request end. thread_id: 22
3 31, 2016 10:48:28 午後 HelloService$$anonfun$apply$1 apply
INFO: message: Hello, Finagle. thread_id: 26
3 31, 2016 10:48:28 午後 LoggingFilter apply
INFO: request start. thread_id: 23
3 31, 2016 10:48:28 午後 LoggingFilter apply
INFO: request end. thread_id: 23
3 31, 2016 10:48:28 午後 HelloService$$anonfun$apply$1 apply
INFO: message: Hello, Finagle. thread_id: 26
今度は Service
のログのみ別のスレッドになり、またログの出力順もフィルタで出力している request end.
の後に Service
のログが出るようになった。
試しに VisualVM でアタッチしてスレッドを見ながら ab
で適当に負荷かけてみると以下のようにスレッドがたくさん生成されるのが判る。
FuturePool.unboundedPool
の実装を見てみると java.util.concurrent.Executors.newCachedThreadPool
を元に作られていた。
https://github.com/twitter/util/blob/develop/util-core/src/main/scala/com/twitter/util/FuturePool.scala#L42-L55
scala:FuturePool.scala
private lazy val defaultExecutor = Executors.newCachedThreadPool(
new NamedPoolThreadFactory("UnboundedFuturePool", makeDaemons = true)
)
・・・
lazy val unboundedPool: FuturePool =
new ExecutorServiceFuturePool(defaultExecutor)
java.util.concurrent.Executors.newCachedThreadPool
の javadoc を見てみると以下のように書いてある。
必要に応じ、新規スレッドを作成するスレッドプールを作成しますが、利用可能な場合には以前に構築されたスレッドを再利用します。
通常、これらのプールは、短期の非同期タスクを多数実行するプログラムのパフォーマンスを改善します。
execute を呼び出すと、以前に構築されたスレッドが利用可能であれば、それを再利用します。
既存のスレッドが使用できない場合、新しい接続が作成され、プールに追加されます。
60 秒間使用されなかったスレッドは、終了して、キャッシュから削除されます。
このため、長い間アイドル状態のプールによりリソースが消費されることはありません。
類似のプロパティーを持つが、詳細の異なるプール (timeout パラメータなど) は、ThreadPoolExecutor コンストラクタを使用して作成できます。
特に生成するスレッドの最大数を指定するパラメータなどは無いようなので、リクエストが多くなればメモリが許す限り新しいスレッドが作成されるっぽい。
FixedThreadPool を使った FuturePool
Web のように不特定多数のリクエストを受ける環境の場合、リクエストによって無制限にスレッドが作られてしまうのはちょっと怖いような気がする。
ということで生成するスレッド数を固定に出来る FixedThreadPool
を使ってみた。
FuturePool.unboundedPool
に習って NamedPoolThreadFactory
を使って名前を付けている。
import com.twitter.finagle.{http, Http, Service, SimpleFilter}
import com.twitter.util.{Await, Future, FuturePool}
import com.twitter.logging.Logger
import com.twitter.concurrent.NamedPoolThreadFactory
import java.util.concurrent.Executors
class LoggingFilter extends SimpleFilter[http.Request, http.Response] {
private val log = Logger.get(getClass)
def apply(request: http.Request, service: Service[http.Request, http.Response]) = {
log.info(s"request start. thread_id: ${Thread.currentThread.getId}")
val f = service(request)
log.info(s"request end. thread_id: ${Thread.currentThread.getId}")
f
}
}
class HelloService extends Service[http.Request, http.Response] {
private val log = Logger.get(getClass)
// 生成されるスレッドの最大値を 5 に指定した FixedThreadPool
val fixedThreadExecutor = Executors.newFixedThreadPool(5,
new NamedPoolThreadFactory("FixedFuturePool", makeDaemons = true)
)
val futurePool: FuturePool = FuturePool(fixedThreadExecutor)
override def apply(request: http.Request): Future[http.Response] = futurePool {
val response = http.Response()
val message = "Hello, Finagle."
log.info(s"message: ${message} thread_id: ${Thread.currentThread.getId}")
response.setContentString(s"${message}\n");
response
}
}
object HTTPServer {
private val log = Logger.get(getClass)
val loggingFilter = new LoggingFilter
val helloService = new HelloService
def main(args: Array[String]):Unit = {
val service = loggingFilter andThen helloService
val server = Http.serve(":8080", service)
Await.ready(server)
}
}
これで ab
で大量にリクエストしてみた時のスレッドの状態を VisualVM で見てみる。
スレッドが名前順で並んでくれなかったので判りづらいが、 ab
の concurrency
を上げてもスレッドは最大で 5 までしか生成されなくなった。
結論
本番環境では FixedThreadPool
を使い最大スレッド数は固定にしておいて、サーバスペックや負荷を見ながら調整していくのがよさげ。