1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

SQSとLambdaの「部分的なバッチレスポンス」を試す

Posted at

背景

SQSとLambdaを連携させるアプリケーションにおいて、メッセージ単位でのエラー処理を考える必要があり、Lambda関数に具備されている「部分的なバッチレスポンス」がどのような仕組みなのかを理解するために実際に触って試した際の記録。

検証構成

SQS -> Lambda

検証

SQS

あらかじめ適当な名前でSQSキューを作っておく。今回は標準キューを採用。

Lambda

  • SQSキューからバッチでメッセージをポーリングし、メッセージ本文をログ(CloudWatchLogs)に出力
  • 部分的なバッチレスポンスの挙動を確認したいので、特定のメッセージのみ例外を発生させる(バッチで取得した最後の要素で例外を発生させる)
  • 例外が発生したメッセージのIDをレスポンスに含める
  • 部分的なバッチレスポンスによって、処理に失敗したメッセージだけがSQSキューに残り続けることを確認する。
import json
import logging

# ログの設定
logger = logging.getLogger()
logger.setLevel(logging.INFO)

def lambda_handler(event, context):
    batch_item_failures = []

    # SQSメッセージを取得
    total_records = len(event['Records'])  # メッセージの総数

    for index, record in enumerate(event['Records']):
        try:
            # 最後のメッセージを強制的に失敗させる
            if index == total_records - 1:
                raise Exception("Forced failure for last message testing")

            # メッセージ本文を取得
            message_body = record['body']

            # ログに出力
            logger.info(f"Received message: {message_body}")
            
        except Exception as e:
            # エラーログを出力
            logger.error(f"Error processing message: {record['messageId']}, error: {str(e)}")
            # 処理に失敗したメッセージのIDをbatchItemFailuresに追加
            batch_item_failures.append({'itemIdentifier': record['messageId']})
    
    return {
        'statusCode': 200,
        'body': json.dumps('Message processed successfully'),
        'batchItemFailures': batch_item_failures
    }

SQSのトリガー設定

バッチサイズ

同時に受信できるメッセージの最大数。

バッチウィンドゥ

バッチされたメッセージを収集するまでの最大時間。例えば、バッチサイズ10の場合、10件のメッセージが収集できた段階で関数実行が開始されるが、バッチウィンドゥの時間までに10件揃わなかった際は、その時点で処理が開始される。

最大同時実行数

Lambda関数が同時に処理できるインスタンスの最大値。自ら上限を設けるために使用する。未設定の場合は、Lambda関数のデフォルト上限値1000まで拡張することとなる。

バッチ項目の失敗を報告

「部分的なバッチレスポンス」を実装する場合は有効にする。Lambda関数内のメッセージ単位の処理で失敗したメッセージのみをキューに残したい場合はこの項目を有効にする必要がある。今回は有効にした。

SQSへのアクセス権限の設定

Lambda関数に設定したサービスロールにSQSへのアクセス権限が無いとトリガー設定時にエラーになる。

trigger の作成中にエラー が発生しました: The provided execution role does not have permissions to call ReceiveMessage on SQS

ChatGPTくんの回答を元に、サービスロールへ以下の権限を追加。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "sqs:ReceiveMessage",
                "sqs:DeleteMessage",
                "sqs:GetQueueAttributes",
                "sqs:ChangeMessageVisibility"
            ],
            "Resource": "arn:aws:sqs:REGION:ACCOUNT_ID:QUEUE_NAME"
        }
    ]
}

権限追加後、トリガーを設定できたことを確認。

動作確認

SQSにまとめてメッセージをPushし、バッチで取得した最後のメッセージだけキューから削除されないかを確認する。
AWS CLIを使ってSQSキューにメッセージを格納する。

#!/bin/bash

# キューのURLを設定
QUEUE_URL="https://sqs.<region>.amazonaws.com/<account-id>/<queue-name>"

# メッセージのバッチを作成して送信
aws sqs send-message-batch \
    --queue-url $QUEUE_URL \
    --entries file://batch-messages.json

batch-messages.jsonに10件のサンプルメッセージを用意し、上記スクリプトを実行する。

Lambda関数のログを確認し、サンプルメッセージの内容が出力されていることを確認する。

[INFO]	2024-10-06T02:09:26.973Z	3882f3d3-6d34-5295-ab1a-aca89ebbe8a4	Received message: This is message 5

取得したメッセージのうち、最後のメッセージは失敗していることを確認。

[ERROR]	2024-10-06T02:10:17.662Z	6976d037-e60b-572d-a283-8f4c15acd142	Error processing message: 08ad583d-9c0a-43af-ab71-4706977ee5f5, error: Forced failure for last message testing

この間、DLQ等は設定していないのでLambda君はひたすらキューに入っているメッセージを頑張って処理しようとしています。SQSのメトリクスから処理中のメッセージ数を確認します。
(ApproximateNumberOfMessagesNotVisible=処理中のメッセージ総数)

{
    "Attributes": {
        "ApproximateNumberOfMessagesNotVisible": "1"
    }
}

ログの内容からどうやら2番目のメッセージがキューに残っているようなので、Lambda関数の中で意図的に例外を発生させていた行をコメントアウトして確認します。(SQSキューは標準キューを使用しているので順序が保証されていない)

[INFO]	2024-10-06T02:25:35.706Z	2eed0358-1a55-541a-9749-5c23ed121609	Received message: This is message 2

想定通り、部分的なバッチレスポンスによって処理に失敗した特定のメッセージだけをキューに残すことができるようです。
しかし、今回のように必ずエラーになってしまうような自体が発生した場合、SQS及びLambda関数はいたずらにコストばかりを消費し続けてしまうので対策を考えます。スノーボールアンチパターン

エラーハンドリング

おなじみデッドレターキュー(DLQ)を使って、今回の問題に対応します。

DLQの設定

新しくDLQの設定として設定するキューを適当に作成し、既存のキューのDLQの設定として設定します。

最大受信数

メッセージをDLQへ移行するまで最大受信数。(再処理可能になった回数)

動作確認(DLQ)

同じスクリプトを実行してメッセージを10件送信します。
10件程度のエラーメッセージを確認したあと、先程設定したDLQにメッセージが1件格納され、Lambda関数の繰り返し処理がストップしたことを確認しました。

スクリーンショット 2024-10-06 121252.png

スクリーンショット 2024-10-06 121759.png

とりあえず処理できなかった可哀想なメッセージの中身を確認します。

{
    "Messages": [
        {
            "MessageId": "c6d2ea93-27e9-4884-b837-572b278bcf80",
            "ReceiptHandle": "AQEBrmoXMER5AzSCnFZq6nYmyizrjHNST9QNqu6y349EaehUCzYLO19XyCzMsnkSpI+Is4ROoQN+RqL+xtrN+UDykxGriTeFm/VMQzoKcXaogYbL2WXaXjnZ4/QBlOP/xLIToK4ZYWwraizQcFno787662yJXLvkCb5Fm9NNDpXU1r4eOSBV+puYieokx9PS5KgswKcdP2XF05Fm8wYlXELsErFnlyhL2zEPOPUX+gdiCtAb7l2jxu4/R0GS8Ej7zfaTjayqAH8ZALIHXyKyJqUCMUqiGywByBoZ69EkxzBoVOed5NhQ7LDLISbHtHHhYviYglSXTYUgboTY5VC9z2BaYJpPYb8QsDvN8QvCwQFxQ0cRbhjW5Rw9tLyuGAwMwOWQLd9vbLF00xvsOsBujo+kPQ==",
            "MD5OfBody": "0acc1def8b8d759897ed1ab7a4a456b6",
            "Body": "This is message 10"
        }
    ]
}

今回は10番目のメッセージが処理されず、最終的にDLQへ配信されたようです。

Lambda関数で発生していたトラブルが解決されたと仮定して、DLQに格納されたメッセージをもとのキューに戻し正しく実行できるかを確認します。※再度、Lambda関数内で意図的に例外を発生させていた行をコメントアウトしておきます。

「DLQ再処理の開始」→「再処理のためにソースキューに入れる」から元のキューに戻します。

スクリーンショット 2024-10-06 122745.png

もとのキューに戻り、無事Lambda関数で処理が行われ当該メッセージもログに出力されていました。

[INFO]	2024-10-06T03:28:26.958Z	645cff66-f90d-5cf7-b225-a01aff0a8266	Received message: This is message 10

まとめ

  • 部分的なバッチレスポンスを使用することで、特定のメッセージのみをSQSキューに残すことができる
  • DLQを設定することで、一定回数処理に失敗したメッセージを別のキューに移すことができる
1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?