Spring WebFlux とブロッキング処理でパフォーマンス測ってみた
はじめに
Spring WebFlux で API を作ってみたらパフォーマンス低すぎて使いものにならなかった・・・なんてことがありました。
当時実装していたのは API で受け付けたリクエストを Elasticsearch 向けの集計クエリに変換してリクエストするような構成でした。
ただ Elasticsearch からのレスポンスが遅く、リクエストがつまりはじめ、最終的にはリクエストしてもすべてタイムアウトになってしまい。。。
その時は実装を変えてはパフォーマンスの検証を繰り返してだいぶ時間を費やしてしまいました。
本稿はその時の色々な実装方法で試したときの備忘録です。
※ 最新バージョンで異なる挙動をしている可能性もあるので、随時最新情報をご確認ください。
前提
- 言語
- Kotlin
- バージョン
- JDK
- Amazon correto JDK
- 11.0.3_7
- Sproing Boot
- 2.5.4
- Amazon correto JDK
- JDK
エンドポイントについて
Kotlin で WebFlux を利用した際のエンドポイントは大きく分けて 3 通りの実装方法がある。
- 関数型エンドポイント
- router
- corouter
- Controller アノテーション
それぞれの実装について以下に示す。
関数型エンドポイント router
@Configuration
class RouterConfig(
private val handler: RouterHandler
) {
@Bean
fun router() = router {
GET("/router/hello", handler::greet)
}
}
@Component
class RouterHandler {
fun greet(request: ServerRequest): Mono<ServerResponse> {
println(Thread.currentThread())
Thread.sleep(10_000L)
return ServerResponse.ok()
.bodyValue("Hello!")
}
}
関数型エンドポイント coRouter
@Configuration
class CoRouterConfig(
private val handler: CoRouterHandler
) {
@Bean
fun corouter() = coRouter {
GET("/corouter/hello", handler::greet)
}
}
@Component
class CoRouterHandler {
suspend fun greet(request: ServerRequest): ServerResponse {
println(Thread.currentThread())
Thread.sleep(10_000L)
return ServerResponse.ok()
.bodyValueAndAwait("Hello!")
}
}
Controller アノテーション
@RestController
class SampleController {
@GetMapping("/controller/hello")
fun greet(): String {
println(Thread.currentThread())
Thread.sleep(10_000L)
return "Hello!"
}
}
パフォーマンス検証
それぞれの実装の中で Thread.sleep()
で擬似的にブロッキング処理を行った。
それぞれのエンドポイントに対して Gatling で負荷をかける。
※ 以下は Controller アノテーションでの負荷検証シナリオだが、関数型エンドポイントのそれぞれについても同様なシナリオを用いた。
class ControllerSimulation extends Simulation {
// ステップ数
val stepRequestPerSec = 3
// ループ回数
val loopCount = 5
// Step数だけリクエストを10秒間かけて増加、20秒間維持
def createSequence(previousUsers: Int, step: Int): Seq[OpenInjectionStep] = Seq(
rampUsersPerSec(previousUsers) to (previousUsers + step) during (10 seconds),
constantUsersPerSec(previousUsers + step) during (20 seconds)
)
// リクエスト数の調整
var seq: Seq[OpenInjectionStep] = Seq()
var users = 0
for (i <- 0 until loopCount) {
seq = seq.appendedAll(createSequence(users, stepRequestPerSec))
users = users + stepRequestPerSec
}
val httpConf = http
// API はローカル環境で立ち上げる
.baseUrl("http://localhost:8080/")
val scn = scenario("controller")
.exec(
http("Get controller")
.get("/controller/hello")
.check(status.is(200))
)
setUp(scn.inject(seq).protocols(httpConf))
}
結果
それぞれの結果についてまとめる。
関数型エンドポイント router
関数型エンドポイント coRouter
Controller アノテーション
説明
結果として、いずれのエンドポイントからも API からのレスポンスが得られなくなった。
レスポンスが返ってこなくなった後に curl で手動にてリクエストしても、レスポンスが非常に遅い、もしくは全く返ってこない。(タイムアウトになる。)
また、ヘルスチェック用に Spring Boot Actuator も含めていたが、そのエンドポイントもレスポンスが得られなかった。
ブロッキング処理の解決方法
Spring Boot のリファレンスを読んでみると以下の記述があった。
ブロッキング API の呼び出し
ブロッキングライブラリを使用する必要がある場合はどうなるでしょうか? Reactor と RxJava は両方とも、異なるスレッドで処理を続行するために publishOn オペレーターを提供します。つまり、簡単な脱出ハッチがあります。ただし、ブロッキング API はこの同時実行モデルには適していません。
対処方法はあるが前提として WebFlux 内ではブロッキング処理を想定されていない。
関数型エンドポイント coRouter での回避方法
reactor のコアスレッドがブロッキングされないように処理を他スレッドに分けることで対応可能。
以下の方法であればブロッキング処理でもリクエストがつまらなくなる。
Mono や Flux を使って Pub/Sub 構成
上記の公式リファレンスの ブロッキング API の呼び出し
にあるとおり、 publishOn
を使えば別スレッドにブロッキング処理を渡し、コアスレッドは別処理に移ることができる。
手元で実装してみたコードは以下の通り。
@Configuration
class RouterConfig(
private val handler: RouterHandler,
private val nonBlockingHandler: RouterNonBlockingHandler
) {
@Bean
fun nonBlockingRouter() = router {
GET("/router/nonBlocking/hello", nonBlockingHandler::greet)
}
}
@Component
class RouterNonBlockingHandler {
fun greet(request: ServerRequest): Mono<ServerResponse> {
println("thred1 - ${Thread.currentThread()}")
return ServerResponse.ok()
.body(
Mono.fromCallable {
println("thred2 - ${Thread.currentThread()}")
Thread.sleep(10_000L)
"Hello!"
}.publishOn(Schedulers.boundedElastic()),
String::class.java
)
}
}
負荷検証
Gatling で上記に使ったスクリプトで再度負荷検証を行った。
結果として性能の改善は見込めた。
ただしスレッド数が制限されているため、ある程度のリクエストが詰まるとタイムアウトとなるリクエストが増えてしまう。。
充分なスレッド数をもたせたら
上記ではブロッキング処理を担当する別スレッドのスレッド数が制限されていたが、この制限を十分まで増やして性能の改善があるかを確認。
実装は以下の通り。
@Component
class RouterNonBlockingHandler {
fun greet(request: ServerRequest): Mono<ServerResponse> {
println("thred1 - ${Thread.currentThread()}")
return ServerResponse.ok()
.body(
Mono.fromCallable {
println("thred2 - ${Thread.currentThread()}")
Thread.sleep(10_000L)
"Hello!"
}.publishOn(Schedulers.newBoundedElastic(5000, 1000, "bound-elastic")),
String::class.java
)
}
}
負荷検証
結果として、すべてのリクエストを捌くことができた。
ただし、考えられる懸念として内部の処理が複雑+巨大になればメモリを圧迫し結果的にパフォーマンスが落ちてしまうのではないかと思われる。
まとめ
WebFlux でブロッキング処理を挟む場合はコードにひと手間追加する必要があるようです。
WebFlux を触り始めたばかりなので抜けている部分も多々あるかと思われますが。。。
個人的に時間のかかった部分だったので備忘録として残しておきたいと思います。
(もうちょっときれいに実装できそうなんですがそのあたりも模索中です。。。
あと、リクエストが詰まった時の挙動(メモリとか)も調べてみようかと思っています。)
。。。というか当時を振り返ると Spring MVC でよかったよねって言われることがあります汗
その通りです汗
以上