X-Rayをちゃんと使ってみてなかったので、調べて使ってみました。
ブラックボックスピタゴラスイ◯ッチやめよう。
今回はAPI GatewayをイベントトリガーにしてLambdaファンクションをキックするようにします。
ソースたちはここに入っています。
https://github.com/yKarakita/lambda_xray
環境など
- Servereless Framework 1.22.0
- python3.6
- aws-xray-sdk 0.95
サンプルファンクション
これを元にしてX-Rayを導入してみる。
import json
def hello(event, context):
body = {
"message": "Hello!"
}
response = {
"statusCode": 200,
"body": json.dumps(body)
}
return response
X-Rayのトレース処理を追加
import json
from aws_xray_sdk.core import xray_recorder
def hello(event, context):
xray_recorder.begin_subsegment('response hello')
body = {
"message": "Hello!"
}
response = {
"statusCode": 200,
"body": json.dumps(body)
}
xray_recorder.end_subsegment()
return response
デコレータを使う方法
デコレータを使っても同様のことができるのでこちらのほうがシンプルでいいかもしれない。
...
@xray_recorder.capture('response hello')
def hello(event, context):
# ... some code
デプロイ
X-RayのSDKはLambdaにデフォルトで組み込まれていないのでをデプロイパッケージに含める必要があります。
今回はServerlessのプラグインserverless-python-requirements
を使用して$ sls deploy
時にrequirements.txtを読み込んで動的にデプロイパッケージを生成しています。
aws-xray-sdk==0.95
$ sls deploy
APIをキック
$ curl https://<API Gatewayエンドポイント>/dev/hello
{"message": "Hello!"}
うまくトレースできてました。
Annotationを使ってみる
Annotationを使うと、Subsegmentを任意のキーで絞込みできるようになる。
書き方はこんな感じ。
subsegment.put_annotation('key', 'value')
...
@xray_recorder.capture('response hello')
def hello(event, context):
# ... some code
status_code = 200
subsegment = xray_recorder.current_subsegment()
subsegment.put_annotation('response_status_code', status_code)
コンソールからはこんな感じで確認できる。
フィルタ式を使用した AWS X-Ray コンソールでのトレースの検索
X-Rayコンソールの入力フィールドにannotation.response_status_code = 200
といれるとannotationのresponse_status_code
に200を入れてputしたものだけを絞りこめる。
これは便利そう!
メタデータを使ってみる
サブセグメントにメタデータを追加することができます。
書き方はこんな感じ
subsegment.put_metadata('key', dict, 'namespace')
import uuid
from aws_xray_sdk.core import xray_recorder
@xray_recorder.capture('response hello')
def hello(event, context):
# ... some code
subsegment.put_metadata('key01', {'id': str(uuid.uuid4())}, 'namespace01')
subsegment.put_metadata('key02', {'id': str(uuid.uuid4())}, 'namespace01')
うん、必要になったら使ってみようかな。
AWSサービスの処理を追跡してみる
https://docs.aws.amazon.com/ja_jp/lambda/latest/dg/lambda-x-ray.html
boto3 (セッションを使用している場合は botocore) にパッチを適用して、AWS のサービスにアクセスするクライアントを作成すると自動的に X-Ray によって追跡されるようにできます。
便利そう。
こんな感じでboto3
にX-Rayのパッチを適用してあげることで、AWSリソースのトレースができるようになるみたいです。
import boto3
from aws_xray_sdk.core import patch
patch(['boto3'])
API Gateway -> Lambda -> S3 Putのサンプル。
import json
import uuid
import os
import boto3
from aws_xray_sdk.core import xray_recorder
from aws_xray_sdk.core import patch
patch(['boto3'])
s3_client = boto3.client('s3')
bucket_name = os.environ['S3_BUCKET']
@xray_recorder.capture('handler')
def hello(event, context):
keyword = event['queryStringParameters']['keyword']
put_object_into_s3(bucket_name=bucket_name, key=str(uuid.uuid4()), body=keyword)
response = {
"statusCode": 200,
"body": json.dumps({'message': 'ok'})
}
return response
@xray_recorder.capture('put_object')
def put_object_into_s3(bucket_name, key, body):
subsegment = xray_recorder.current_subsegment()
response = s3_client.put_object(Bucket=bucket_name, Key=key, Body=body)
status_code = response['ResponseMetadata']['HTTPStatusCode']
subsegment.put_annotation('put_response', status_code)
$ curl https://xxxxxxxx.execute-api.ap-northeast-1.amazonaws.com/dev/hello?keyword=hogeeeeeeeeeeee
{"message": "ok"}
詳細とサービスマップはこんな感じ。うまくいってるっぽいすね。
非同期処理をトレースしたい
こんな感じの非同期処理だとどうだろ。
API Gateway
↓
Lambda(SNSにPubした後HTTPレスポンスを返す)
↓
SNS
↓
Lambda(SNSからメッセージをSubしてS3にPutする)
↓
S3
やってみます。
何も考えずにすべての関数にX-Rayデコレータをつけてみる。
まずAPI Gatewayからイベントを受け取ってSNSにPubするファンクション。
import json
import os
import boto3
from aws_xray_sdk.core import xray_recorder
from aws_xray_sdk.core import patch
patch(['boto3'])
sns = boto3.resource('sns', 'ap-northeast-1')
sns_client = boto3.client('sns')
topic_arn = os.environ['SNS_TOPIC_ARN']
@xray_recorder.capture('handler')
def hello(event, context):
keyword = event['queryStringParameters']['keyword']
sns_client.publish(
TopicArn=topic_arn,
Message=keyword,
Subject='object_body'
)
response = {
"statusCode": 200,
"body": json.dumps({'message': 'ok'})
}
return response
そしてSNSをSubして、S3にPutするファンクション。
import uuid
import os
import boto3
from aws_xray_sdk.core import xray_recorder
from aws_xray_sdk.core import patch
patch(['boto3'])
s3_client = boto3.client('s3')
bucket_name = os.environ['S3_BUCKET']
@xray_recorder.capture('put_s3_handler')
def handler(event, context):
subscribed_data = event['Records'][0]['Sns']
body = subscribed_data['Message']
put_object_into_s3(bucket_name, str(uuid.uuid4()), body)
@xray_recorder.capture('put_object')
def put_object_into_s3(bucket_name, bucket_key, body):
subsegment = xray_recorder.current_subsegment()
response = s3_client.put_object(Bucket=bucket_name, Key=bucket_key, Body=body)
status_code = response['ResponseMetadata']['HTTPStatusCode']
subsegment.put_annotation('put_response', status_code)
$ curl https://xxxxxxxx.execute-api.ap-northeast-1.amazonaws.com/dev/hello?keyword=hogeeeeeeeeeeee
{"message": "ok"}
実行するとこんな感じでSNSより前とそれ以降が分かれてしまい、うまくトレースできませんでした。
調べてみたところ現時点でこれをうまくトレースさせたい場合は、SNSとその次のLambdaの間に、もうひとつ別のLambdaを用意する必要があるとのことでした。
Lambda -> SNS -> Lambda -> Lambda
最初のLambdaで生成されたTrace IDをSNSで中継役のLambdaに渡して、そいつがTrace IDを含めて次のLambdaを起動すればいけるみたい。
こちらの記事で詳しく紹介されています。
AWS X-Rayで非同期メッセージをトレースする
まとめ
入れておこう。