概要
我々のアプリケーションでは、外部サーバーへの接続が必要となるシナリオが 3 つあり、WebClient を使用して、POST リクエストしています。
- シナリオ1 : ある外部 API サーバーへの接続
- シナリオ2 : API Gateway への接続
- シナリオ3 : IOT デバイスへの API コールをするための中継サーバー接続
Spring Boot のバージョンをあげたところ、3 つのシナリオのうち、外部 API サーバーへの接続のシナリオにおいて、表題の現象が発生する頻度が増加しました。
頻度
バージョンアップ前は、日に 1 ~ 2 度程度でしたが、バージョンアップ後は、日に 50 件程度になりました。なお、問題が発生していないAPI Gateway への接続、IOT デバイスへの API コールをするための中継サーバー接続のシナリオについては、バージョンアップ前においても、表題のエラーが発生することはありませんでした。
バージョンアップ前
Spring Boot 2.2.6.RELEASE
=> Spring Web Flux 5.2.5
=> reactor core 3.3.4.RELEASE
=> reactor netty http 0.9.6.RELEASE
バージョンアップ後
Spring Boot 2.5.6
=> Spring Web Flux 5.3.12
=> reactor core 3.4.11
=> reactor netty http 1.0.12
問題となった実装
問題が発生しているシナリオ、発生していないシナリオ共に、下記と同様の実装になっています。(retrieve は成功以外のレスポンスを受け取ると例外を発生させるので、外側で例外をキャッチしています。また、exchangeToMono などのメソッドを使うと、成功以外のレスポンスを受け取っても例外を発生させませんが、Connection reset by peer が発生する場合、WebClientRequestException が発生して、必ず例外が投げられます。おそらく、リクエスト先への接続すらできていないため、この挙動になると思われます。そのため、最も確実かつ効率よく例外をハンドルしたい場合は、retrieve メソッドを使いつつ、利用元にて try catch することになります。)
@Component
class ExternalApiClient(
private val webClientBuilder: WebClient.Builder
) {
fun post(parameter: String): Pair<HttpStatus, String?>? {
return webClientBuilder
.build()
.post()
.uri("https://api.example.com/resources/function")
.contentType(MediaType.APPLICATION_JSON)
.accept(MediaType.APPLICATION_JSON)
.body(
BodyInserters.fromValue(
mapOf(
"parameter" to parameter
)
)
)
.retrieve()
.toEntity(String::class.java).let {
it.block()?.let {
it.statusCode to it.body
}
}
}
}
エラーの内容
org.springframework.web.reactive.function.client.WebClientRequestException: readAddress(..)
failed: Connection reset by peer; nested exception is io.netty.channel.unix.Errors$NativeIoException: readAddress(..) failed: Connection reset by peer
at org.springframework.web.reactive.function.client.ExchangeFunctions$DefaultExchangeFunction.lambda$wrapException$9(ExchangeFunctions.java:141) ~[spring-webflux-5.3.12.jar!/:5.3.12]
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: Error has been observed at the following site(s):
*__checkpoint ⇢ Request to POST https:/api.example.com/resources/function [DefaultWebClient]
Stack trace:
at org.springframework.web.reactive.function.client.ExchangeFunctions$DefaultExchangeFunction.lambda$wrapException$9(ExchangeFunctions.java:141) ~[spring-webflux-5.3.12.jar!/:5.3.12]
at reactor.core.publisher.MonoErrorSupplied.subscribe(MonoErrorSupplied.java:55) ~[reactor-core-3.4.11.jar!/:3.4.11]
at reactor.core.publisher.Mono.subscribe(Mono.java:4399) ~[reactor-core-3.4.11.jar!/:3.4.11]
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103) ~[reactor-core-3.4.11.jar!/:3.4.11]
at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:222) ~[reactor-core-3.4.11.jar!/:3.4.11]
at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:222) ~[reactor-core-3.4.11.jar!/:3.4.11]
at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:222) ~[reactor-core-3.4.11.jar!/:3.4.11]
at reactor.core.publisher.MonoNext$NextSubscriber.onError(MonoNext.java:93) ~[reactor-core-3.4.11.jar!/:3.4.11]
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onError(MonoFlatMapMany.java:204) ~[reactor-core-3.4.11.jar!/:3.4.11]
at reactor.core.publisher.SerializedSubscriber.onError(SerializedSubscriber.java:124) ~[reactor-core-3.4.11.jar!/:3.4.11]
at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.whenError(FluxRetryWhen.java:225) ~[reactor-core-3.4.11.jar!/:3.4.11]
at reactor.core.publisher.FluxRetryWhen$RetryWhenOtherSubscriber.onError(FluxRetryWhen.java:274) ~[reactor-core-3.4.11.jar!/:3.4.11]
原因について
トライ & エラーをしながら調査した結果、こちらの議論 で指摘されているように、コネクションプールの再利用に起因していると考えて間違えないと判断しました。
後述の対処策から、WebClient は、コネクションプールをキャッシュしている内部動作になっているはずであると考えています。また、問題が発生している外部 API サーバーについては、確立したままのコネクションを定期的に切断しており、他のリクエストが必ず成功する接続先のサーバーよりコネクションを切断する間隔が短い挙動になっていると考えています。Spring Boot のバージョンアップ前までは、エラーの発生頻度から、デフォルトのコネクションプールのライフサイクルに従い適切なタイミングまでコネクションプールが再利用されてスループットを向上させる挙動になっていたと思われます。しかし、Spring Boot のバージョンアップによって、WebClient (netty) のコネクションプールのライフサイクルが、外部 API サーバーのコネクションの切断の期間より長くなってしまったのだと考えています。
詳細
おそらく、WebClient のさらに奥にある netty では、内部的に、コネクションプールをキャッシュしている挙動になっていると考えられます。
WebClient#builderメソッド を見てみると、新規に、DefaultWebClientBuilder を生成しており、DefaultWebClientBuilder#buildメソッド にて、ReactorClientHttpConnector を生成する挙動になっています。新規インスタンスを生成する際にはキャッシュは使われていないように見えますが、最終的に、[netty の実装] (https://github.com/reactor/reactor-netty/blob/a00bfb13805b2678595e4bf8669462c203f92492/reactor-netty-core/src/main/java/reactor/netty/tcp/TcpResources.java#L352) の TcpResource#getOrCreate にたどり着いた際に、コネクションプールが再利用される挙動になっていると思われます。というのも、Spring Boot が提供している、WebClient.Builder を使用せず、
fun post(parameter: String): Pair<HttpStatus, String?>? {
return WebClient.
.builder()
.build()
としても現象が改善しなかったためです。
対処策について
GitHub 上の議論では、コネクションプールのライフサイクルについて微調整を加えて対処していましたが、スピードと安全性の天秤から、clientConnector に、明示的に新規コネクションを指定 する方法を採用しました。
また、念のため、Spring Boot が提供している、WebClientBuilder は使用せず、新規 WebClientBuilder を生成するようにしています。
object ExternalApiClient {
fun post(parameter: String): Pair<HttpStatus, String?>? {
return WebClient.
.builder()
.clientConnector(ReactorClientHttpConnector(HttpClient.newConnection()))
.build()
.post()
.uri("https://api.example.com/resources/function")
.contentType(MediaType.APPLICATION_JSON)
.accept(MediaType.APPLICATION_JSON)
.body(
BodyInserters.fromValue(
mapOf(
"parameter" to parameter
)
)
)
.retrieve()
.toEntity(String::class.java).let {
it.block()?.let {
it.statusCode to it.body
}
}
}
}