初めに
AWSの資格勉強で SQS について問われることが多かったのですが、実際に触ったことがなく、どういった操作方法なのか気になっていました。また、試験で勉強したことがあるという段階からステップアップしたかったということもあり使ってみました。
やったこと
SQS経由でテストデータをDynamoDBに登録することをしました。
EC2 → SQS → Lambda → DynamoDB
- EC2
- メッセージを SQS に送信する
- SQS
- EC2からメッセージを受信し Lambda のトリガーとなるキューと、デッドレターキューを用意する
- Lambda
- SQSのメッセージの受信をトリガーに DynamoDB にテストデータを登録し、メッセージを削除する
EC2での操作
AWS CLIを 使って SQS にメッセージを送信します。
メッセージを送信
送信のコマンドは aws sqs send-message
です。
-
--queue-url
- キューのURLを渡す
-
--message-body
- メッセージ部分に DynamoDB のテーブル名を渡す
-
--message-attributes
- DynamoDBに登録するアイテムを記述したファイルを渡す
送信成功時は以下のような JSON が出力されます。
[ec2-user@ip-172-31-34-229 ~]$ aws sqs send-message \
--region ap-northeast-1 \
--queue-url https://sqs.ap-northeast-1.amazonaws.com/123456789012/test \
--message-body "Products" \
--message-attributes file://send-message.json
出力
{
"MD5OfMessageBody": "068f80c7519d0528fb08e82137a72131",
"MD5OfMessageAttributes": "7dbe6d7feabe8374790be14e676cd2d4",
"MessageId": "9b75e2f3-9410-4373-a2bf-e655a492633a"
}
send-message.json は以下のファイルです。
{
"ProductName": {
"DataType": "String",
"StringValue": "chair"
},
"Price": {
"DataType": "Number",
"StringValue": "2000"
}
}
メッセージを受信
今回は使用しませんが、メッセージの受信コマンドです。
- キューに保存されているメッセージの確認
[ec2-user@ip-172-31-34-229 ~]$ aws sqs receive-message \
--region ap-northeast-1 \
--queue-url https://sqs.ap-northeast-1.amazonaws.com/123456789012/test
出力
※ v/XkNyUcF7Dtz2aFAA==
は省略しています。
{
"Messages": [
{
"Body": "001",
"ReceiptHandle": "v/XkNyUcF7Dtz2aFAA==",
"MD5OfBody": "dc5c7986daef50c1e02ab09b442ee34f",
"MessageId": "a2306bfb-c9ad-4a7b-b087-7e9d05f905d3"
}
]
}
メッセージを削除
キューに保存されているメッセージを削除します。削除メッセージの ReceiptHandle
を指定します。
※ AQEBN8BxbfbkKxPVVNq4cw==
は省略しています。
[ec2-user@ip-172-31-34-229 ~]$ aws sqs delete-message \
--region ap-northeast-1 \
--queue-url https://sqs.ap-northeast-1.amazonaws.com/123456789012/test \
--receipt-handle AQEBN8BxbfbkKxPVVNq4cw==
SQSでの操作
キューを作成します。ここでは「test」というキューを作成しました。他の設定箇所はすべてデフォルトで作成します。
Lambdaでの操作
Lambda関数は以下のように書きました。
import boto3
def lambda_handler(event, context):
for record in event['Records']:
print(record)
product_data = record['messageAttributes']
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table(record['body'])
item = {
'ProductName': product_data['ProductName']['stringValue'],
'Price': product_data['Price']['stringValue'],
'Time': record['attributes']['SentTimestamp']
}
table.put_item(Item=item)
return {
'statusCode': 200,
'body': event
}
上記の record ですが、以下のような構造になっています。
※'Bp/OBCOGlB191EjjFuNdd=='
は省略しています。
{
'messageId': 'd4e2f90d-9693-4b13-b96d-50573362f3dd',
'receiptHandle': 'Bp/OBCOGlB191EjjFuNdd==',
'body': 'Products',
'attributes': {
'ApproximateReceiveCount': '13',
'SentTimestamp': '1621644982480',
'SenderId': 'AROARL3774TQC6UJDRGIO:i-00d47f6259300590c',
'ApproximateFirstReceiveTimestamp': '1621644982480'
},
'messageAttributes': {
'ProductName': {
'stringValue': 'chair',
'stringListValues': [],
'binaryListValues': [],
'dataType': 'String'
},
'Price': {
'stringValue': '2000',
'stringListValues': [],
'binaryListValues': [],
'dataType': 'Number'
}
},
'md5OfMessageAttributes': '7dbe6d7feabe8374790be14e676cd2d4',
'md5OfBody': '068f80c7519d0528fb08e82137a72131',
'eventSource': 'aws:sqs',
'eventSourceARN': 'arn:aws:sqs:ap-northeast-1:123456789012:test',
'awsRegion': 'ap-northeast-1'
}
body
に --message-body "Products"
の値が入ります。
messageAttributes
に send-message.json の内容が入ります。
Lambda の実行が成功した場合、キューのメッセージは削除されます。Lambda の実行が失敗した場合、キューのメッセージは削除されず、その後数回同じ Lambda 関数が実行されていることが CloudWatch Logs から確認できました。これに関してドキュメントに以下の記述があります。
デフォルトでは、SQS でレコードが使用可能になるとすぐに、Lambda 関数を呼び出します。
Lambda 関数のエラーなどでメッセージの処理に失敗すると、そのメッセージは削除されず、「処理中のメッセージ」から「利用可能なメッセージ」へと移動し、再度 Lambda はメッセージを受信します。
デッドレターキュー
処理に失敗したあるキューのメッセージに対して、 「何回失敗したか」によって他のキューにメッセージを移動します。Lambda がメッセージ処理に失敗した場合、Lambda がそのキューを再度呼び出さないようにデッドレターキューを設定します。
検証
「dead-letter-queue」というデッドレターキュー用のキューを作成します。
以下の「編集」からデッドレターキューの編集をします。
「有効」にチェックを入れ、「キューの選択」にデッドレターキュー用のキューのARNを指定します。
Lambda のトリガーには「test」キューを指定し、Lambda 関数をエラーが出るようなコードにします。その後「test」キューにメッセージを送信します。Lambda がメッセージを受信するので、以下のように「test」キューの「処理中のメッセージ」にメッセージが移動します。
3 回失敗した後、「dead-letter-queue」にメッセージが移動しました。これで Lambda がメッセージを呼び出すことはありません。
可視性タイムアウト
可視性タイムアウトとは、他のコンシューマーが再度同じメッセージを処理しないようにするため、そのメッセージを受信および処理できなくなる期間のことです。
以下の「編集」から変更です。
検証
EC2 からでメッセージを受信した後、どのような挙動をするか見てみます。まず以下のように「利用可能なメッセージ」が 1 の状態であることを確認します。検証のため、可視性タイムアウトを 2 分に設定しておきます。
次に EC2 でメッセージを受信します。その後「ポーリング」をクリックします。(可視性タイムアウトを 2 分に設定したのは、以下の進捗状況バーが100%になるまで可視性タイムアウトが解除されない時間を設定しておきたかったからです。)
可視性タイムアウトを超えない範囲では、コンシューマーはメッセージを受信できませんでした。