はじめに
AWSで非同期処理と言えばSQSが代表的です。
SQSは、処理したい内容をメッセージとしてキューに保管し、後続のサービスがそれを取り出して非同期で処理する仕組みを提供します。
本記事では、このSQSの非同期処理の仕組みを、AWS上での簡単なハンズオンを通じて具体的に理解することを目的とします。
この記事でやること
- プロデューサー用LambdaがSQSに中身の異なるメッセージを送信(50件)
- SQSがメッセージを保存
- コンシューマー用LambdaがキューメッセージをDynamoDBに順次格納
ハンズオンを通して、 SQSで保管されているキューメッセージがLambdaのポーリングによって少しずつDynamoDBに書きこまれていく(=非同期処理が行われていく) ことを確認したいと思います。
- 1つ目のLambda:プロデューサー(処理を依頼)
- SQS:処理内容を保管するキューストレージ
- 2つ目のLambda:コンシューマー(処理を実行)
- DynamoDB:処理結果の保存先
リソース作成
Cloudshell上でAWS CLIを用いながら、CloudFormationを活用して一挙にリソースを作成します。以降のコマンドはCloudshell上で実行してみてください。
cloudformationを用いてデプロイ
テンプレートで以下を作成します。
- プロデューサー用Lambda
- SQS
- コンシューマー用Lambda
- DyanmoDB
ハンズオン実施に必要なIAM権限やリソースの紐づけも含めて定義していきます。
cat > async-demo.yaml << 'EOF'
AWSTemplateFormatVersion: '2010-09-09'
Description: SQS Async Demo Hands-on
Resources:
AsyncDemoTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: async-demo-table-01
BillingMode: PAY_PER_REQUEST
AttributeDefinitions:
- AttributeName: task_id
AttributeType: S
KeySchema:
- AttributeName: task_id
KeyType: HASH
AsyncDemoQueue:
Type: AWS::SQS::Queue
Properties:
QueueName: async-demo-queue-01
VisibilityTimeout: 30
ProducerRole:
Type: AWS::IAM::Role
Properties:
RoleName: async-demo-producer-role-01
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service:
- lambda.amazonaws.com
Action:
- sts:AssumeRole
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
Policies:
- PolicyName: SQSSendPolicy
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- sqs:SendMessage
Resource: !GetAtt AsyncDemoQueue.Arn
ConsumerRole:
Type: AWS::IAM::Role
Properties:
RoleName: async-demo-consumer-role-01
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service:
- lambda.amazonaws.com
Action:
- sts:AssumeRole
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
Policies:
- PolicyName: ConsumerPolicy
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- dynamodb:PutItem
Resource: !GetAtt AsyncDemoTable.Arn
- Effect: Allow
Action:
- sqs:ReceiveMessage
- sqs:DeleteMessage
- sqs:GetQueueAttributes
- sqs:ChangeMessageVisibility
Resource: !GetAtt AsyncDemoQueue.Arn
ProducerFunction:
Type: AWS::Lambda::Function
Properties:
FunctionName: async-demo-producer-01
Runtime: python3.13
Handler: index.lambda_handler
Timeout: 30
Role: !GetAtt ProducerRole.Arn
Environment:
Variables:
QUEUE_URL: !Ref AsyncDemoQueue
Code:
ZipFile: |
import json
import uuid
import boto3
import os
sqs = boto3.client("sqs")
QUEUE_URL = os.environ["QUEUE_URL"]
def lambda_handler(event, context):
for i in range(50):
sqs.send_message(
QueueUrl=QUEUE_URL,
MessageBody=json.dumps({
"task_id": str(uuid.uuid4()),
"number": i
})
)
return {
"statusCode": 200,
"message": "50 messages sent"
}
ConsumerFunction:
Type: AWS::Lambda::Function
Properties:
FunctionName: async-demo-consumer-01
Runtime: python3.13
Handler: index.lambda_handler
Timeout: 30
ReservedConcurrentExecutions: 1
Role: !GetAtt ConsumerRole.Arn
Environment:
Variables:
TABLE_NAME: !Ref AsyncDemoTable
Code:
ZipFile: |
import json
import time
import boto3
import os
table_name = os.environ["TABLE_NAME"]
ddb = boto3.resource("dynamodb")
table = ddb.Table(table_name)
def lambda_handler(event, context):
for record in event["Records"]:
body = json.loads(record["body"])
print(f"Processing {body['task_id']}")
time.sleep(5)
table.put_item(
Item={
"task_id": body["task_id"],
"number": body["number"]
}
)
return {
"statusCode": 200
}
SQSTrigger:
Type: AWS::Lambda::EventSourceMapping
Properties:
EventSourceArn: !GetAtt AsyncDemoQueue.Arn
FunctionName: !Ref ConsumerFunction
BatchSize: 1
Enabled: true
Outputs:
QueueUrl:
Value: !Ref AsyncDemoQueue
ProducerFunction:
Value: !Ref ProducerFunction
EOF
■yamlファイルをもとにCloudFormationを実行
aws cloudformation deploy \
--template-file async-demo.yaml \
--stack-name async-demo-stack \
--capabilities CAPABILITY_NAMED_IAM
■【デプロイ確認】以下のコマンドで「CREATE_COMPLETE」が表示されることを確認。
aws cloudformation describe-stacks \
--stack-name async-demo-stack \
--query 'Stacks[0].StackStatus'
CREATE_COMPLETE
上記のデプロイで以下のリソースが作成されています。
①プロデューサー用Lambda
- 実行するとSQSにランダムなtask idデータを50件登録
②SQS
- プロデューサーから受けたメッセージを保持
- コンシューマー用Lambdaからのメッセージ取得を待機
③コンシューマー用Lambda
- SQSが保持するキューを順次DynamoDBに保管
- メッセージを取得してから3秒後にDynamoDBテーブルにPUT
(非同期処理を見るためにPUTの速度を落とす)
④DynamoDB
- task_idを主キーとするシンプルなテーブル
- プロデューサーによって生成されたランダムなtask idを保管
デモ準備
デモでは、プロデューサー用Lambdaの実行後に、SQSとDynamoDBの状態をリアルタイムで確認するためのスクリプトを使用します。まずは、以下のコマンドでモニタリングスクリプトを作成しておきましょう。
■スクリプトを作成
cat > monitor_async_demo.sh << 'EOF'
#!/bin/bash
QUEUE_NAME="async-demo-queue-01"
TABLE_NAME="async-demo-table-01"
# Queue URLを動的に取得
QUEUE_URL=$(aws sqs get-queue-url \
--queue-name "${QUEUE_NAME}" \
--query 'QueueUrl' \
--output text)
while true
do
clear
echo "===== $(date) ====="
echo
AVAILABLE=$(aws sqs get-queue-attributes \
--queue-url "${QUEUE_URL}" \
--attribute-names ApproximateNumberOfMessages \
--query 'Attributes.ApproximateNumberOfMessages' \
--output text)
INFLIGHT=$(aws sqs get-queue-attributes \
--queue-url "${QUEUE_URL}" \
--attribute-names ApproximateNumberOfMessagesNotVisible \
--query 'Attributes.ApproximateNumberOfMessagesNotVisible' \
--output text)
DDB_COUNT=$(aws dynamodb scan \
--table-name "${TABLE_NAME}" \
--select COUNT \
--query 'Count' \
--output text
)
echo "[SQS]"
echo "Available (未処理) : ${AVAILABLE}"
echo "In Flight (処理中): ${INFLIGHT}"
echo
echo "[DynamoDB]"
echo "保存済み件数 : ${DDB_COUNT}"
echo
echo "3秒後に再実行します..."
sleep 3
done
EOF
■スクリプトを実行できるように権限を調整
chmod +x monitor_async_demo.sh
このスクリプトは、3秒ごとにSQSが保持するデータ数(未処理データ、処理中データ)と、DynamoDBに保管されたデータ数を表示し続けるものになります。
出力例:
[SQS]
Available (未処理) : 80
In Flight (処理中): 20[DynamoDB]
保存済み件数 : 0
デモでは、Lambdaを実行 → スクリプトを実行 の順序で進め、SQSに登録されたデータが非同期にDynamoDBに保管されることを視覚的に理解していきます。
デモ実施
では50件のデータをSQSに投入します
aws lambda invoke \
--function-name async-demo-producer-01 \
response.json
以下のようにresponse.jsonのステータスが200だった場合は成功しています。
{ "statusCode": 200, "message": "50 messages sent" }
では、スクリプトを実行し、結果を見てみましょう。
./monitor_async_demo.sh
実行すると、以下のような変化が見られます。
[SQS]
Available : 33
In Flight : 20
[DynamoDB]
Count : 0
↓
[SQS]
Available : 21
In Flight : 13
[DynamoDB]
Count : 20
↓
[SQS]
Available : 0
In Flight : 0
[DynamoDB]
Count : 50
このことから、SQSに一気に登録されたデータが徐々にDynamoDBに登録されていく様子が分かるかと思います。
注)SQSのメッセージ数+DynamoDBに保管される数 ≠ 投入したデータ数の理由
SQSの裏の仕組みは複数のキュー保管サーバからなる分散アーキテクチャであるため、今回デモのために確認したSQSメトリクス値はどちらも大体の数を表示します。
- ApproximateNumberOfMessages(未処理)
- ApproximateNumberOfMessagesNotVisible(処理中)
このため、スクリプトでSQS、DynamoDBのデータ数を取得する瞬間は、合計の値が投入したデータ数と整合しない可能性があります。
▶参考:https://docs.aws.amazon.com/ja_jp/lambda/latest/dg/with-sqs.html
以上でデモは完了となります。以下コマンドで一気に環境を掃除して終了しましょう。
aws cloudformation delete-stack \
--stack-name async-demo-stack
まとめ
本記事を通してSQSの非同期処理について理解が深まったでしょうか。
AWS公式ドキュメントでは、この非同期処理をより具体的にイメージしやすい例え話として、以下のような記事がありますので紹介します。
私はたまに立ち食い寿司のお店に行くことがあるのですが、そのお店では複数の寿司職人さんがいます。私は注文を書いた紙を所定の位置に置いておけば、手が空いた寿司職人さんが注文を受け取ってお寿司を握ってくれます。
(省略)
私は注文を書いた後、手の空いた寿司職人さんを捕まえられるまで探し続けなくても良いですし、依頼したお寿司を握ってもらっている時間も自由に使えるので「次に何を頼もうかな」と考えたり他の作業ができます。寿司職人さんは自分の手が空いたタイミングで次の注文を受け取ることで、今自分にできる適切な作業量を保てます。
引用元:https://aws.amazon.com/jp/builders-flash/202401/sqs-process-duplication/
この記事のハンズオンが、皆さんの理解の一助になれれば幸いです。
注意事項
本ブログに掲載している内容は、私個人の見解であり、所属する組織の立場や戦略、意見を代表するものではありません。
あくまでエンジニアとしての経験や考えを発信していますので、ご了承ください。
