0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

【ハンズオン】SQSの非同期処理を体験してみる

0
Posted at

はじめに

AWSで非同期処理と言えばSQSが代表的です。
SQSは、処理したい内容をメッセージとしてキューに保管し、後続のサービスがそれを取り出して非同期で処理する仕組みを提供します。

本記事では、このSQSの非同期処理の仕組みを、AWS上での簡単なハンズオンを通じて具体的に理解することを目的とします。

この記事でやること

  • プロデューサー用LambdaがSQSに中身の異なるメッセージを送信(50件)
  • SQSがメッセージを保存
  • コンシューマー用LambdaがキューメッセージをDynamoDBに順次格納

ハンズオンを通して、 SQSで保管されているキューメッセージがLambdaのポーリングによって少しずつDynamoDBに書きこまれていく(=非同期処理が行われていく) ことを確認したいと思います。

非同期処理のアーキテクチャ概要.png

  • 1つ目のLambda:プロデューサー(処理を依頼)
  • SQS:処理内容を保管するキューストレージ
  • 2つ目のLambda:コンシューマー(処理を実行)
  • DynamoDB:処理結果の保存先

リソース作成

Cloudshell上でAWS CLIを用いながら、CloudFormationを活用して一挙にリソースを作成します。以降のコマンドはCloudshell上で実行してみてください。

cloudformationを用いてデプロイ

テンプレートで以下を作成します。

  • プロデューサー用Lambda
  • SQS
  • コンシューマー用Lambda
  • DyanmoDB

ハンズオン実施に必要なIAM権限やリソースの紐づけも含めて定義していきます。

cat > async-demo.yaml << 'EOF'
AWSTemplateFormatVersion: '2010-09-09'
Description: SQS Async Demo Hands-on

Resources:

  AsyncDemoTable:
    Type: AWS::DynamoDB::Table
    Properties:
      TableName: async-demo-table-01
      BillingMode: PAY_PER_REQUEST
      AttributeDefinitions:
        - AttributeName: task_id
          AttributeType: S
      KeySchema:
        - AttributeName: task_id
          KeyType: HASH

  AsyncDemoQueue:
    Type: AWS::SQS::Queue
    Properties:
      QueueName: async-demo-queue-01
      VisibilityTimeout: 30

  ProducerRole:
    Type: AWS::IAM::Role
    Properties:
      RoleName: async-demo-producer-role-01

      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              Service:
                - lambda.amazonaws.com
            Action:
              - sts:AssumeRole

      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole

      Policies:
        - PolicyName: SQSSendPolicy
          PolicyDocument:
            Version: '2012-10-17'
            Statement:
              - Effect: Allow
                Action:
                  - sqs:SendMessage
                Resource: !GetAtt AsyncDemoQueue.Arn

  ConsumerRole:
    Type: AWS::IAM::Role
    Properties:
      RoleName: async-demo-consumer-role-01

      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              Service:
                - lambda.amazonaws.com
            Action:
              - sts:AssumeRole

      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole

      Policies:
        - PolicyName: ConsumerPolicy
          PolicyDocument:
            Version: '2012-10-17'
            Statement:
              - Effect: Allow
                Action:
                  - dynamodb:PutItem
                Resource: !GetAtt AsyncDemoTable.Arn

              - Effect: Allow
                Action:
                  - sqs:ReceiveMessage
                  - sqs:DeleteMessage
                  - sqs:GetQueueAttributes
                  - sqs:ChangeMessageVisibility
                Resource: !GetAtt AsyncDemoQueue.Arn

  ProducerFunction:
    Type: AWS::Lambda::Function
    Properties:
      FunctionName: async-demo-producer-01
      Runtime: python3.13
      Handler: index.lambda_handler
      Timeout: 30
      Role: !GetAtt ProducerRole.Arn

      Environment:
        Variables:
          QUEUE_URL: !Ref AsyncDemoQueue

      Code:
        ZipFile: |
          import json
          import uuid
          import boto3
          import os

          sqs = boto3.client("sqs")
          QUEUE_URL = os.environ["QUEUE_URL"]

          def lambda_handler(event, context):

              for i in range(50):
                  sqs.send_message(
                      QueueUrl=QUEUE_URL,
                      MessageBody=json.dumps({
                          "task_id": str(uuid.uuid4()),
                          "number": i
                      })
                  )

              return {
                  "statusCode": 200,
                  "message": "50 messages sent"
              }

  ConsumerFunction:
    Type: AWS::Lambda::Function
    Properties:
      FunctionName: async-demo-consumer-01
      Runtime: python3.13
      Handler: index.lambda_handler
      Timeout: 30
      ReservedConcurrentExecutions: 1
      Role: !GetAtt ConsumerRole.Arn

      Environment:
        Variables:
          TABLE_NAME: !Ref AsyncDemoTable

      Code:
        ZipFile: |
          import json
          import time
          import boto3
          import os

          table_name = os.environ["TABLE_NAME"]

          ddb = boto3.resource("dynamodb")
          table = ddb.Table(table_name)

          def lambda_handler(event, context):

              for record in event["Records"]:

                  body = json.loads(record["body"])

                  print(f"Processing {body['task_id']}")

                  time.sleep(5)

                  table.put_item(
                      Item={
                          "task_id": body["task_id"],
                          "number": body["number"]
                      }
                  )

              return {
                  "statusCode": 200
              }

  SQSTrigger:
    Type: AWS::Lambda::EventSourceMapping
    Properties:
      EventSourceArn: !GetAtt AsyncDemoQueue.Arn
      FunctionName: !Ref ConsumerFunction
      BatchSize: 1
      Enabled: true

Outputs:

  QueueUrl:
    Value: !Ref AsyncDemoQueue

  ProducerFunction:
    Value: !Ref ProducerFunction

EOF

■yamlファイルをもとにCloudFormationを実行

aws cloudformation deploy \
  --template-file async-demo.yaml \
  --stack-name async-demo-stack \
  --capabilities CAPABILITY_NAMED_IAM

■【デプロイ確認】以下のコマンドで「CREATE_COMPLETE」が表示されることを確認。

aws cloudformation describe-stacks \
  --stack-name async-demo-stack \
  --query 'Stacks[0].StackStatus'

CREATE_COMPLETE

上記のデプロイで以下のリソースが作成されています。

①プロデューサー用Lambda

  • 実行するとSQSにランダムなtask idデータを50件登録

②SQS
  • プロデューサーから受けたメッセージを保持
  • コンシューマー用Lambdaからのメッセージ取得を待機

③コンシューマー用Lambda
  • SQSが保持するキューを順次DynamoDBに保管
  • メッセージを取得してから3秒後にDynamoDBテーブルにPUT
    (非同期処理を見るためにPUTの速度を落とす)

④DynamoDB
  • task_idを主キーとするシンプルなテーブル
  • プロデューサーによって生成されたランダムなtask idを保管

デモ準備

デモでは、プロデューサー用Lambdaの実行後に、SQSとDynamoDBの状態をリアルタイムで確認するためのスクリプトを使用します。まずは、以下のコマンドでモニタリングスクリプトを作成しておきましょう。

■スクリプトを作成

cat > monitor_async_demo.sh << 'EOF'
#!/bin/bash

QUEUE_NAME="async-demo-queue-01"
TABLE_NAME="async-demo-table-01"

# Queue URLを動的に取得
QUEUE_URL=$(aws sqs get-queue-url \
  --queue-name "${QUEUE_NAME}" \
  --query 'QueueUrl' \
  --output text)

while true
do
    clear

    echo "===== $(date) ====="
    echo

    AVAILABLE=$(aws sqs get-queue-attributes \
      --queue-url "${QUEUE_URL}" \
      --attribute-names ApproximateNumberOfMessages \
      --query 'Attributes.ApproximateNumberOfMessages' \
      --output text)

    INFLIGHT=$(aws sqs get-queue-attributes \
      --queue-url "${QUEUE_URL}" \
      --attribute-names ApproximateNumberOfMessagesNotVisible \
      --query 'Attributes.ApproximateNumberOfMessagesNotVisible' \
      --output text)

    DDB_COUNT=$(aws dynamodb scan \
      --table-name "${TABLE_NAME}" \
      --select COUNT \
      --query 'Count' \
      --output text

    )

    echo "[SQS]"
    echo "Available (未処理) : ${AVAILABLE}"
    echo "In Flight (処理中): ${INFLIGHT}"

    echo
    echo "[DynamoDB]"
    echo "保存済み件数      : ${DDB_COUNT}"

    echo
    echo "3秒後に再実行します..."
    sleep 3
done
EOF

■スクリプトを実行できるように権限を調整

chmod +x monitor_async_demo.sh

このスクリプトは、3秒ごとにSQSが保持するデータ数(未処理データ、処理中データ)と、DynamoDBに保管されたデータ数を表示し続けるものになります。

出力例:

[SQS]
Available (未処理) : 80
In Flight (処理中): 20

[DynamoDB]
保存済み件数 : 0

デモでは、Lambdaを実行 → スクリプトを実行 の順序で進め、SQSに登録されたデータが非同期にDynamoDBに保管されることを視覚的に理解していきます。

デモ実施

では50件のデータをSQSに投入します

aws lambda invoke \
  --function-name async-demo-producer-01 \
  response.json

以下のようにresponse.jsonのステータスが200だった場合は成功しています。

{ "statusCode": 200, "message": "50 messages sent" }

では、スクリプトを実行し、結果を見てみましょう。

./monitor_async_demo.sh

実行すると、以下のような変化が見られます。

[SQS]
Available : 33
In Flight : 20

[DynamoDB]
Count : 0

↓

[SQS]
Available : 21
In Flight : 13

[DynamoDB]
Count : 20

↓

[SQS]
Available : 0
In Flight : 0

[DynamoDB]
Count : 50

このことから、SQSに一気に登録されたデータが徐々にDynamoDBに登録されていく様子が分かるかと思います。

注)SQSのメッセージ数+DynamoDBに保管される数 ≠ 投入したデータ数の理由

SQSの裏の仕組みは複数のキュー保管サーバからなる分散アーキテクチャであるため、今回デモのために確認したSQSメトリクス値はどちらも大体の数を表示します。

  • ApproximateNumberOfMessages(未処理)
  • ApproximateNumberOfMessagesNotVisible(処理中)
    このため、スクリプトでSQS、DynamoDBのデータ数を取得する瞬間は、合計の値が投入したデータ数と整合しない可能性があります。
    ▶参考:https://docs.aws.amazon.com/ja_jp/lambda/latest/dg/with-sqs.html

以上でデモは完了となります。以下コマンドで一気に環境を掃除して終了しましょう。

aws cloudformation delete-stack \
  --stack-name async-demo-stack

まとめ

本記事を通してSQSの非同期処理について理解が深まったでしょうか。
AWS公式ドキュメントでは、この非同期処理をより具体的にイメージしやすい例え話として、以下のような記事がありますので紹介します。

私はたまに立ち食い寿司のお店に行くことがあるのですが、そのお店では複数の寿司職人さんがいます。私は注文を書いた紙を所定の位置に置いておけば、手が空いた寿司職人さんが注文を受け取ってお寿司を握ってくれます。
(省略)
私は注文を書いた後、手の空いた寿司職人さんを捕まえられるまで探し続けなくても良いですし、依頼したお寿司を握ってもらっている時間も自由に使えるので「次に何を頼もうかな」と考えたり他の作業ができます。寿司職人さんは自分の手が空いたタイミングで次の注文を受け取ることで、今自分にできる適切な作業量を保てます。
引用元:https://aws.amazon.com/jp/builders-flash/202401/sqs-process-duplication/

この記事のハンズオンが、皆さんの理解の一助になれれば幸いです。

注意事項
本ブログに掲載している内容は、私個人の見解であり、所属する組織の立場や戦略、意見を代表するものではありません。
あくまでエンジニアとしての経験や考えを発信していますので、ご了承ください。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?