はじめに
SQSトリガーのLambda実行について、色々と検証してみたくなったので試してみた。
対象とする読者
- SQSの基本は知ってるけど、細かい動作まで知りたい人
- SQSトリガーのLambda実行環境をcdkで構築したい人
結論)わかったこと
- SQSのキューがFIFOの場合、紐づくデッドレターキュー(DLQ)も強制FIFO
- LambdaのOnSuccess/OnFailureに紐づけるキューはFIFO不可
- LambdaのDLQもFIFO不可
- 最大受信回数到達時、Lambdaエラー→可視性タイムアウト→DLQへ送信の順番
- SQSからのLambda実行は非同期呼び出しではない
→ LambdaのOnSuccess/OnFailure送信先やDLQは効かない
検証環境構築
以下の環境を構築する。
AWS CDKを用いて構築する。
(CDKって何??という方には以前私が投稿した記事をご参考までに)
各種キューとLambda関数
import { Duration, Stack, StackProps } from "aws-cdk-lib";
import * as sqs from "aws-cdk-lib/aws-sqs";
import { Runtime } from "aws-cdk-lib/aws-lambda";
import { NodejsFunction } from "aws-cdk-lib/aws-lambda-nodejs";
import { Construct } from "constructs";
import { SqsEventSource } from "aws-cdk-lib/aws-lambda-event-sources";
import { SqsDestination } from "aws-cdk-lib/aws-lambda-destinations";
export class SandboxStack extends Stack {
constructor(scope: Construct, id: string, props?: StackProps) {
super(scope, id, props);
// SQSに紐付けるデッドレターキュー
const dlq = new sqs.Queue(this, "SandboxDeadLetterQueue", {
fifo: true,
queueName: "SandboxDeadLetterQueue.fifo",
visibilityTimeout: Duration.seconds(180),
contentBasedDeduplication: true,
});
// 受信Lambdaに紐付けるデッドレターキュー
const lambdaDlq = new sqs.Queue(this, "SandboxLambdaDeadLetterQueue", {
queueName: "SandboxLambdaDeadLetterQueue",
visibilityTimeout: Duration.seconds(180),
});
// 受信LambdaのonFailureに紐づけるキュー
const onFailureQueue = new sqs.Queue(this, "SandboxFailureQueue", {
queueName: "SandboxFailureQueue",
visibilityTimeout: Duration.seconds(180),
});
// 通常キュー
const queue = new sqs.Queue(this, "SandboxQueue", {
fifo: true,
queueName: "SandboxQueue.fifo",
visibilityTimeout: Duration.seconds(180),
contentBasedDeduplication: true,
receiveMessageWaitTime: Duration.seconds(10),
deadLetterQueue: {
maxReceiveCount: 1,
queue: dlq,
},
});
// メッセージ送信Lambda
const sender = new NodejsFunction(this, "SendHandler", {
functionName: "SendHandler",
runtime: Runtime.NODEJS_18_X,
entry: "src/send.ts",
handler: "handler",
environment: {
QUEUE_URL: queue.queueUrl,
},
});
// メッセージ受信Lambda
const receiver = new NodejsFunction(this, "ReceiveHandler", {
functionName: "ReceiveHandler",
runtime: Runtime.NODEJS_18_X,
entry: "src/receive.ts",
handler: "handler",
timeout: Duration.seconds(5),
onFailure: new SqsDestination(onFailureQueue),
deadLetterQueueEnabled: true,
deadLetterQueue: lambdaDlq,
retryAttempts: 0,
environment: {
ERROR_MESSAGE: "test error",
SLEEP_MS: "3000",
},
});
// 送信LambdaにSQSのsendMessage権限付与
queue.grantSendMessages(sender);
// 受信Lambdaのイベントソースに通常キューを追加
receiver.addEventSource(
new SqsEventSource(queue, {
batchSize: 1,
maxConcurrency: 2,
})
);
}
}
メッセージ送信Lambda
- マネジメントコンソールからテストで動かす想定
- 複数のメッセージを容易に発行できるようにしている
- FIFOキューでコンテンツに基づく重複削除を設定しているため、削除されないように異なるメッセージを送るようにしている
- 一度に送ると順番が前後したので、100msのスリープを入れている
import { SQSClient, SendMessageCommand } from "@aws-sdk/client-sqs";
import { v4 as uuidv4 } from "uuid";
type EventType = {
messageBody: string;
messageGroupId: string;
numberOfMessages: number;
};
export const handler = async (event: EventType): Promise<void> => {
const queueUrl = process.env.QUEUE_URL;
const sqsClient = new SQSClient();
const id = uuidv4();
for (let i = 0; i < event.numberOfMessages; i++) {
const message = `${event.messageBody}_${id}_${i}`;
const command = new SendMessageCommand({
QueueUrl: queueUrl,
MessageBody: message,
MessageGroupId: event.messageGroupId,
});
console.log(command);
await sqsClient.send(command);
await new Promise((resolve) => setTimeout(resolve, 100));
}
};
メッセージ受信Lambda
- 環境変数からスリープ時間と例外発生させるかどうかを指定できる
- スリープ時間はタイムアウトの動作検証をするために使用
import { SQSEvent, SQSHandler } from "aws-lambda";
export const handler: SQSHandler = async (event: SQSEvent): Promise<void> => {
const sleepMs = Number(process.env.SLEEP_MS);
const errorMessage = process.env.ERROR_MESSAGE;
await new Promise((resolve) => setTimeout(resolve, sleepMs));
console.log(event.Records);
if (errorMessage) {
throw new Error(errorMessage);
}
};
検証環境構築する中での気づき
cdkでの構築中にエラーとなり、以下2点に気づいた。
- FIFOキューの場合、紐づくデッドレターキュー(DLQ)も強制FIFO
- LambdaのOnSuccess/OnFailureやDLQに紐づけるキューはFIFO不可
公式ドキュメントにも記載あり。
FIFOキューのデッドレターキューは、FIFOキューでもある必要があります。同様に、標準キューのデッドレターキューは、標準キューでもある必要があります。
Lambda は、非同期呼び出しの次の送信先をサポートしています。SQS FIFO キューと SNS FIFO トピックはサポートされていないことに注意してください。
検証
SQSに紐づくDLQにはいつ入るか?
最大受信回数(maxReceiveCount)を1にして、受信Lambdaが必ずエラーするようにして検証した。
予想
- 送信Lambdaが通常キューにメッセージを送信
- 通常キューから受信Lambdaがポーリング
- 受信Lambdaがエラー
- SQSに紐付けたデッドレターキューにメッセージが移動
検証結果
- 送信Lambdaが通常キューにメッセージを送信
- 通常キューから受信Lambdaがポーリング
- 受信Lambdaがエラー
- 可視性タイムアウトの時間だけ「処理中のメッセージ」として待機
- SQSに紐付けたデッドレターキューにメッセージが移動
最大受信回数到達時、可視性タイムアウトだけ経過してからDLQに送られる
LambdaのDLQやOnFailureに指定したキューにいつ入るか?
検証した操作
- 受信Lambdaの処理中にエラー
- 受信Lambdaのタイムアウトエラー
いずれの場合も、LambdaのDLQやOnFailureで指定したキューに入ることはなかった。。。
よくよく調べると以下の結論に行き着いた。
SQSトリガーでのLambda実行は非同期呼び出しではないため、OnFailure/OnSuccessに指定したキューや、Lambdaで指定したDLQには入らない
公式ドキュメントにも記載あり。
Lambda はキューをポーリングし、Lambda 関数を、キューメッセージを含むイベントと共に同期的に呼び出します。
所感
似たような仕組みが色々あり、ややこしい。。。
公式ドキュメントってやっぱりちゃんと読まないとだめだなあ。
参考文献