はじめに
SQSにLambdaトリガーを設定して、SQSがメッセージを受信した時、自動でLambda関数が発火する仕組みを試してみました。
見なくていいですが、気分は前回記事の続き?です。
挙動まとめ
こんな感じの挙動が確認できました。
①正常系
キューはメッセージ受信後、メッセージをeventに含めながらlambdaを実行します。
可視性タイムアウト時間内にlambdaが正常終了すると、キューからメッセージが削除されます。
②異常系
キューはメッセージ受信後、メッセージをevent contextにしながらlambdaを実行します。
可視性タイムアウト時間内にlambdaが終了しない、または、lambdaが異常終了した場合、メッセージは削除されません。
lambda関数の実行回数がメッセージの最大受信数を下回る場合、lambda関数は再度実行されます。
lambda関数の実行回数がメッセージの最大受信数と等しい場合、lambda関数は実行されず、デッドレターキューへメッセージが転送されます。
その後、キューのメッセージは削除されます。
動作確認
事前設定
lambda関数にSQS操作権限を追加
{
"Statement": [
{
"Action": [
"logs:*"
],
"Effect": "Allow",
"Resource": "arn:aws:logs:*:*:*"
},
{
"Action": [
"sqs:ReceiveMessage",
"sqs:GetQueueAttributes",
"sqs:DeleteMessage"
],
"Effect": "Allow",
"Resource": "*"
}
],
"Version": "2012-10-17"
}
SQSとlambdaを連携
SQS側とlambda側どちらからでも設定できます。
SQS画面から設定
Lambda トリガーから対象の関数を指定するだけで完了です。
お手軽でびっくりしました。
Lambda画面から設定
トリガーを追加からSQSを対象にすればOKです。
イベントフィルターなど、SQS側から設定するより多くの設定項目を選択できます。
テスト実施
関数を準備
import json
import time
def lambda_handler(event, context):
try:
print("Received event: " + json.dumps(event, indent=2))
# time.sleep(30) # 可視性タイムアウト時の挙動確認用
# print(aaa) # lambda関数の実行時エラー発生時の挙動確認用
except Exception as e:
print(e)
raise e
else:
return {
"statusCode": "200",
"body" : json.dumps({"test": "value"})
}
キューへメッセージ送信
aws sqs send-message --queue-url ${PRIMARY_QUEUE_ENDPOINT} \
--message-body "{\"type\": \"test\", \"index\": \"001\", \"text\": \"aaa\"}" \
--message-deduplication-id $(openssl rand -base64 20 | tr -d '\n') \
--message-group-id test_group \
--output json
cloudwatchで動作確認~正常系~
eventオブジェクト内に送信したメッセージbodyが含まれていることが確認できます。
また、キューからメッセージが削除されていることが確認できます。
cloudwatchで動作確認~異常系・可視性タイムアウト~
- # time.sleep(30) # 可視性タイムアウト時の挙動確認用
+ time.sleep(30) # 可視性タイムアウト時の挙動確認用
lambda関数の処理時間を意図的に長く設定し、キューの可視性タイムアウトを起こしてみます。
ログにメッセージIdが複数回表示されており、TimeStampからもリトライが発生していることが分かります。
最終的にメッセージがデッドレターに送信されていることも確認できました。
さいごに
簡単にですがSQSとLambdaを連携することができました。
リトライやメッセージの削除、デッドレターキューへの送信などの後処理まで自動で行ってくれて、かなり便利でした。
機会があれば、ぜひアーキテクチャに組み込みたいです。