LoginSignup
6
4

More than 5 years have passed since last update.

Finagle で FixedThreadPool を使ってみる

Posted at

Finagle で FuturePool.unboundedPool ではなく FixedThreadPool を使った FuturePool を使う為の方法を調べてみた。

環境

試した環境は以下。

build.sbt
name := "finagle-example"
scalaVersion := "2.11.7"

resolvers += "twitter-repo" at "https://maven.twttr.com"

libraryDependencies ++= Seq(
  "com.twitter" %% "finagle-http" % "6.34.0"
)

FuturePool が無い場合

まずは FuturePool 無しで実行した場合の挙動を確認してみる。
以下のような簡単なサンプルを書いてみた。

import com.twitter.finagle.{http, Http, Service, SimpleFilter}
import com.twitter.util.{Await, Future}
import com.twitter.logging.Logger

class LoggingFilter extends SimpleFilter[http.Request, http.Response]  {
  private val log = Logger.get(getClass)

  def apply(request: http.Request, service: Service[http.Request, http.Response]) = {
    log.info(s"request start. thread_id: ${Thread.currentThread.getId}")
    val f = service(request)
    log.info(s"request end. thread_id: ${Thread.currentThread.getId}")
    f
  }
}

class HelloService extends Service[http.Request, http.Response] {
  private val log = Logger.get(getClass)

  override def apply(request: http.Request): Future[http.Response] = {
    val response = http.Response()
    val message  = "Hello, Finagle."
    log.info(s"message: ${message} thread_id: ${Thread.currentThread.getId}")
    response.setContentString(s"${message}\n");
    Future.value(response)
  }
}

object HTTPServer {

  private val log = Logger.get(getClass)

  val loggingFilter = new LoggingFilter
  val helloService  = new HelloService

  def main(args: Array[String]):Unit = {
    val service = loggingFilter andThen helloService
    val server = Http.serve(":8080", service)
    Await.ready(server)
  }
}

sbt run で起動し、何回かリクエストを送信してみると、以下のようなログが出力された。

% sbt run
[info] Set current project to finagle-example (in build file:/home/akishin/src/scala/finagle_ex/)
[info] Compiling 1 Scala source to /home/akishin/src/scala/finagle_ex/target/scala-2.11/classes...
[info] Running HTTPServer 
3 31, 2016 10:43:37 午後 com.twitter.finagle.Init$$anonfun$1 apply$mcV$sp
情報: Finagle version 6.34.0 (rev=44f444f606b10582c2da8d5770b7879ddd961211) built at 20160310-155158
3 31, 2016 10:44:38 午後 LoggingFilter apply
INFO: request start. thread_id: 22
3 31, 2016 10:44:38 午後 HelloService apply
INFO: message: Hello, Finagle. thread_id: 22
3 31, 2016 10:44:38 午後 LoggingFilter apply
INFO: request end. thread_id: 22
3 31, 2016 10:44:38 午後 LoggingFilter apply
INFO: request start. thread_id: 23
3 31, 2016 10:44:38 午後 HelloService apply
INFO: message: Hello, Finagle. thread_id: 23
3 31, 2016 10:44:38 午後 LoggingFilter apply
INFO: request end. thread_id: 23
3 31, 2016 10:44:38 午後 LoggingFilter apply
INFO: request start. thread_id: 24
3 31, 2016 10:44:38 午後 HelloService apply
INFO: message: Hello, Finagle. thread_id: 24
3 31, 2016 10:44:38 午後 LoggingFilter apply
INFO: request end. thread_id: 24

取り敢えずブロッキングするような処理は何も無いので、 thread_id を見るとリクエスト毎に同じスレッドで順番に実行されている事が判る。

FuturePool.unboundedPool

次に ServiceFuturePool.unboundedPool で実行してみる。
コードは以下のようになった。

import com.twitter.finagle.{http, Http, Service, SimpleFilter}
import com.twitter.util.{Await, Future, FuturePool}
import com.twitter.logging.Logger

class LoggingFilter extends SimpleFilter[http.Request, http.Response]  {
  private val log = Logger.get(getClass)

  def apply(request: http.Request, service: Service[http.Request, http.Response]) = {
    log.info(s"request start. thread_id: ${Thread.currentThread.getId}")
    val f = service(request)
    log.info(s"request end. thread_id: ${Thread.currentThread.getId}")
    f
  }
}

class HelloService extends Service[http.Request, http.Response] {
  private val log = Logger.get(getClass)

  override def apply(request: http.Request): Future[http.Response] = FuturePool.unboundedPool {
    val response = http.Response()
    val message  = "Hello, Finagle."
    log.info(s"message: ${message} thread_id: ${Thread.currentThread.getId}")
    response.setContentString(s"${message}\n");
    response
  }
}

object HTTPServer {

  private val log = Logger.get(getClass)

  val loggingFilter = new LoggingFilter
  val helloService  = new HelloService

  def main(args: Array[String]):Unit = {
    val service = loggingFilter andThen helloService
    val server = Http.serve(":8080", service)
    Await.ready(server)
  }
}

sbt run で起動し、同様にして何回かリクエストを送信してみる。

% sbt run
[info] Set current project to finagle-example (in build file:/home/akishin/src/scala/finagle_ex/)
[info] Compiling 1 Scala source to /home/akishin/src/scala/finagle_ex/target/scala-2.11/classes...
[info] Running HTTPServer 
3 31, 2016 10:47:57 午後 com.twitter.finagle.Init$$anonfun$1 apply$mcV$sp
情報: Finagle version 6.34.0 (rev=44f444f606b10582c2da8d5770b7879ddd961211) built at 20160310-155158
3 31, 2016 10:48:28 午後 LoggingFilter apply
INFO: request start. thread_id: 21
3 31, 2016 10:48:28 午後 LoggingFilter apply
INFO: request end. thread_id: 21
3 31, 2016 10:48:28 午後 HelloService$$anonfun$apply$1 apply
INFO: message: Hello, Finagle. thread_id: 26
3 31, 2016 10:48:28 午後 LoggingFilter apply
INFO: request start. thread_id: 22
3 31, 2016 10:48:28 午後 LoggingFilter apply
INFO: request end. thread_id: 22
3 31, 2016 10:48:28 午後 HelloService$$anonfun$apply$1 apply
INFO: message: Hello, Finagle. thread_id: 26
3 31, 2016 10:48:28 午後 LoggingFilter apply
INFO: request start. thread_id: 23
3 31, 2016 10:48:28 午後 LoggingFilter apply
INFO: request end. thread_id: 23
3 31, 2016 10:48:28 午後 HelloService$$anonfun$apply$1 apply
INFO: message: Hello, Finagle. thread_id: 26

今度は Service のログのみ別のスレッドになり、またログの出力順もフィルタで出力している request end. の後に Service のログが出るようになった。

試しに VisualVM でアタッチしてスレッドを見ながら ab で適当に負荷かけてみると以下のようにスレッドがたくさん生成されるのが判る。

WS000085.jpg

FuturePool.unboundedPool の実装を見てみると java.util.concurrent.Executors.newCachedThreadPool を元に作られていた。

https://github.com/twitter/util/blob/develop/util-core/src/main/scala/com/twitter/util/FuturePool.scala#L42-L55
scala:FuturePool.scala
private lazy val defaultExecutor = Executors.newCachedThreadPool(
new NamedPoolThreadFactory("UnboundedFuturePool", makeDaemons = true)
)
・・・
lazy val unboundedPool: FuturePool =
new ExecutorServiceFuturePool(defaultExecutor)

java.util.concurrent.Executors.newCachedThreadPool の javadoc を見てみると以下のように書いてある。

必要に応じ、新規スレッドを作成するスレッドプールを作成しますが、利用可能な場合には以前に構築されたスレッドを再利用します。
通常、これらのプールは、短期の非同期タスクを多数実行するプログラムのパフォーマンスを改善します。
execute を呼び出すと、以前に構築されたスレッドが利用可能であれば、それを再利用します。
既存のスレッドが使用できない場合、新しい接続が作成され、プールに追加されます。
60 秒間使用されなかったスレッドは、終了して、キャッシュから削除されます。
このため、長い間アイドル状態のプールによりリソースが消費されることはありません。
類似のプロパティーを持つが、詳細の異なるプール (timeout パラメータなど) は、ThreadPoolExecutor コンストラクタを使用して作成できます。

特に生成するスレッドの最大数を指定するパラメータなどは無いようなので、リクエストが多くなればメモリが許す限り新しいスレッドが作成されるっぽい。

FixedThreadPool を使った FuturePool

Web のように不特定多数のリクエストを受ける環境の場合、リクエストによって無制限にスレッドが作られてしまうのはちょっと怖いような気がする。

ということで生成するスレッド数を固定に出来る FixedThreadPool を使ってみた。

FuturePool.unboundedPool に習って NamedPoolThreadFactory を使って名前を付けている。

import com.twitter.finagle.{http, Http, Service, SimpleFilter}
import com.twitter.util.{Await, Future, FuturePool}
import com.twitter.logging.Logger
import com.twitter.concurrent.NamedPoolThreadFactory
import java.util.concurrent.Executors

class LoggingFilter extends SimpleFilter[http.Request, http.Response]  {
  private val log = Logger.get(getClass)

  def apply(request: http.Request, service: Service[http.Request, http.Response]) = {
    log.info(s"request start. thread_id: ${Thread.currentThread.getId}")
    val f = service(request)
    log.info(s"request end. thread_id: ${Thread.currentThread.getId}")
    f
  }
}

class HelloService extends Service[http.Request, http.Response] {
  private val log = Logger.get(getClass)

  // 生成されるスレッドの最大値を 5 に指定した FixedThreadPool
  val fixedThreadExecutor = Executors.newFixedThreadPool(5,
    new NamedPoolThreadFactory("FixedFuturePool", makeDaemons = true)
  )
  val futurePool: FuturePool = FuturePool(fixedThreadExecutor)

  override def apply(request: http.Request): Future[http.Response] = futurePool {
    val response = http.Response()
    val message  = "Hello, Finagle."
    log.info(s"message: ${message} thread_id: ${Thread.currentThread.getId}")
    response.setContentString(s"${message}\n");
    response
  }
}

object HTTPServer {

  private val log = Logger.get(getClass)

  val loggingFilter = new LoggingFilter
  val helloService  = new HelloService

  def main(args: Array[String]):Unit = {
    val service = loggingFilter andThen helloService
    val server = Http.serve(":8080", service)
    Await.ready(server)
  }
}

これで ab で大量にリクエストしてみた時のスレッドの状態を VisualVM で見てみる。

WS000086.jpg

スレッドが名前順で並んでくれなかったので判りづらいが、 abconcurrency を上げてもスレッドは最大で 5 までしか生成されなくなった。

結論

本番環境では FixedThreadPool を使い最大スレッド数は固定にしておいて、サーバスペックや負荷を見ながら調整していくのがよさげ。

6
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
6
4