Microservicesを構築するにあたり、各サービス間のメッセージのやりとりに、非同期処理を使うことがあると思います。
その非同期処理をぱっと簡単にトレースしたいとかってあると思いますが、それを、Serverlessにやってみたってお話です。
やりたいこと
こんな流れを想定しています。
最初のLambdaから、SQSにメッセージを送信し、SQSをcronで起動されたLambdaがポーリングして、メッセージを取得したあと、なんしかの処理をした後、DynamoDBにデータを永続化します。
実際のX-Ray
こんな感じで出力されます。
ぱっと見でうまいことトレースができている様子ですが、実際には、publishしたメッセージがsubscribeした側との繋りがなく、トレースができているようでいて、できていません。
上図のように、publishのトレースを見てみると、SQSにメッセージを送信した時点で切れています。
トレースできるようにする方法
pubしたメッセージを実際にDynamoDBに保存されるまでトレースするには、以下のような構成を取るといけます。
Lambda1でpubする方法
Lambda1でメッセージを送信する際に、Lambda実行時の環境変数から、TraceIdを取り出して、SQSのメッセージに加えます。
Scalaで書くとこんな感じ。
val sqs = AmazonSQSClient.builder()
.withRegion(region)
.withRequestHandlers(new TracingHandler(AWSXRay.getGlobalRecorder))
.build()
val traceId = sys.env("_X_AMZN_TRACE_ID")
sqs.sendMessage(queueUrl, traceId)
Lambda2でsubする方法
Lambda2でsubするときに、SQSのメッセージからTraceIdを取得して、Lambda3をinvokeします。
Scalaで書くとこんな感じ。
val clientConfiguration = new ClientConfiguration()
clientConfiguration.addHeader("x-amzn-trace-id", message.getBody)
val lambda = AWSLambdaClient.builder()
.withRegion(region)
.withClientConfiguration(clientConfiguration)
.build
val invokeRequest = new InvokeRequest()
.withFunctionName(functionName)
.withPayload(s"""{"id":"${message.getBody}"}""")
.withLogType(LogType.Tail)
lambda.invoke(invokeRequest)
トレースできてるX-Ray
この手法だと、以下の様にトレースできています。
ちょっと複雑な感じに見えますが、これでメッセージをpubしてからDynamoDBにputするまでトレースできています。
このように、pubしたものが、38.5sかけて、subされてDynamoDBにputされている様子が見えます。
まとめ
非同期を可視化してトレースできるっていうのは、Microservicesを運用していくうえで、けっこー重要な可視化ですよね。
それをX-Rayで実現していくのに、こういった方法が取れます。
とはいえ、Lambda2 => Lambda3を起動するということで、レイテンシーの低下がありますから、そのあたりが許容できるかどうかは、要件次第でもあります。
X-Rayが改善されて、このような工夫をしなくてもトレースできることを期待しています。
以上でっす。
検証でつくったGitHub