LoginSignup
4
2

【AWS CDK / SQS+Lambda】SQSからのLambda呼び出しについて検証してみた

Last updated at Posted at 2024-01-22

はじめに

SQSトリガーのLambda実行について、色々と検証してみたくなったので試してみた。

対象とする読者

  • SQSの基本は知ってるけど、細かい動作まで知りたい人
  • SQSトリガーのLambda実行環境をcdkで構築したい人

結論)わかったこと

  • SQSのキューがFIFOの場合、紐づくデッドレターキュー(DLQ)も強制FIFO
  • LambdaのOnSuccess/OnFailureに紐づけるキューはFIFO不可
  • LambdaのDLQもFIFO不可
  • 最大受信回数到達時、Lambdaエラー→可視性タイムアウト→DLQへ送信の順番
  • SQSからのLambda実行は非同期呼び出しではない
    → LambdaのOnSuccess/OnFailure送信先やDLQは効かない

検証環境構築

以下の環境を構築する。

AWS CDKを用いて構築する。
(CDKって何??という方には以前私が投稿した記事をご参考までに)

各種キューとLambda関数

lib/sandbox-stack.ts
import { Duration, Stack, StackProps } from "aws-cdk-lib";
import * as sqs from "aws-cdk-lib/aws-sqs";
import { Runtime } from "aws-cdk-lib/aws-lambda";
import { NodejsFunction } from "aws-cdk-lib/aws-lambda-nodejs";
import { Construct } from "constructs";
import { SqsEventSource } from "aws-cdk-lib/aws-lambda-event-sources";
import { SqsDestination } from "aws-cdk-lib/aws-lambda-destinations";

export class SandboxStack extends Stack {
  constructor(scope: Construct, id: string, props?: StackProps) {
    super(scope, id, props);

    // SQSに紐付けるデッドレターキュー
    const dlq = new sqs.Queue(this, "SandboxDeadLetterQueue", {
      fifo: true,
      queueName: "SandboxDeadLetterQueue.fifo",
      visibilityTimeout: Duration.seconds(180),
      contentBasedDeduplication: true,
    });

    // 受信Lambdaに紐付けるデッドレターキュー
    const lambdaDlq = new sqs.Queue(this, "SandboxLambdaDeadLetterQueue", {
      queueName: "SandboxLambdaDeadLetterQueue",
      visibilityTimeout: Duration.seconds(180),
    });

    // 受信LambdaのonFailureに紐づけるキュー
    const onFailureQueue = new sqs.Queue(this, "SandboxFailureQueue", {
      queueName: "SandboxFailureQueue",
      visibilityTimeout: Duration.seconds(180),
    });

    // 通常キュー
    const queue = new sqs.Queue(this, "SandboxQueue", {
      fifo: true,
      queueName: "SandboxQueue.fifo",
      visibilityTimeout: Duration.seconds(180),
      contentBasedDeduplication: true,
      receiveMessageWaitTime: Duration.seconds(10),
      deadLetterQueue: {
        maxReceiveCount: 1,
        queue: dlq,
      },
    });

    // メッセージ送信Lambda
    const sender = new NodejsFunction(this, "SendHandler", {
      functionName: "SendHandler",
      runtime: Runtime.NODEJS_18_X,
      entry: "src/send.ts",
      handler: "handler",
      environment: {
        QUEUE_URL: queue.queueUrl,
      },
    });

    // メッセージ受信Lambda
    const receiver = new NodejsFunction(this, "ReceiveHandler", {
      functionName: "ReceiveHandler",
      runtime: Runtime.NODEJS_18_X,
      entry: "src/receive.ts",
      handler: "handler",
      timeout: Duration.seconds(5),
      onFailure: new SqsDestination(onFailureQueue),
      deadLetterQueueEnabled: true,
      deadLetterQueue: lambdaDlq,
      retryAttempts: 0,
      environment: {
        ERROR_MESSAGE: "test error",
        SLEEP_MS: "3000",
      },
    });

    // 送信LambdaにSQSのsendMessage権限付与
    queue.grantSendMessages(sender);
    // 受信Lambdaのイベントソースに通常キューを追加
    receiver.addEventSource(
      new SqsEventSource(queue, {
        batchSize: 1,
        maxConcurrency: 2,
      })
    );
  }
}

メッセージ送信Lambda

  • マネジメントコンソールからテストで動かす想定
  • 複数のメッセージを容易に発行できるようにしている
  • FIFOキューでコンテンツに基づく重複削除を設定しているため、削除されないように異なるメッセージを送るようにしている
  • 一度に送ると順番が前後したので、100msのスリープを入れている
src/send.ts
import { SQSClient, SendMessageCommand } from "@aws-sdk/client-sqs";
import { v4 as uuidv4 } from "uuid";

type EventType = {
  messageBody: string;
  messageGroupId: string;
  numberOfMessages: number;
};

export const handler = async (event: EventType): Promise<void> => {
  const queueUrl = process.env.QUEUE_URL;
  const sqsClient = new SQSClient();
  const id = uuidv4();
  for (let i = 0; i < event.numberOfMessages; i++) {
    const message = `${event.messageBody}_${id}_${i}`;
    const command = new SendMessageCommand({
      QueueUrl: queueUrl,
      MessageBody: message,
      MessageGroupId: event.messageGroupId,
    });
    console.log(command);
    await sqsClient.send(command);
    await new Promise((resolve) => setTimeout(resolve, 100));
  }
};

メッセージ受信Lambda

  • 環境変数からスリープ時間と例外発生させるかどうかを指定できる
  • スリープ時間はタイムアウトの動作検証をするために使用
src/receive.ts
import { SQSEvent, SQSHandler } from "aws-lambda";

export const handler: SQSHandler = async (event: SQSEvent): Promise<void> => {
  const sleepMs = Number(process.env.SLEEP_MS);
  const errorMessage = process.env.ERROR_MESSAGE;
  await new Promise((resolve) => setTimeout(resolve, sleepMs));
  console.log(event.Records);
  if (errorMessage) {
    throw new Error(errorMessage);
  }
};

検証環境構築する中での気づき

cdkでの構築中にエラーとなり、以下2点に気づいた。

  • FIFOキューの場合、紐づくデッドレターキュー(DLQ)も強制FIFO
  • LambdaのOnSuccess/OnFailureやDLQに紐づけるキューはFIFO不可

公式ドキュメントにも記載あり。

FIFOキューのデッドレターキューは、FIFOキューでもある必要があります。同様に、標準キューのデッドレターキューは、標準キューでもある必要があります。

Lambda は、非同期呼び出しの次の送信先をサポートしています。SQS FIFO キューと SNS FIFO トピックはサポートされていないことに注意してください。

検証

SQSに紐づくDLQにはいつ入るか?

最大受信回数(maxReceiveCount)を1にして、受信Lambdaが必ずエラーするようにして検証した。

予想

  1. 送信Lambdaが通常キューにメッセージを送信
  2. 通常キューから受信Lambdaがポーリング
  3. 受信Lambdaがエラー
  4. SQSに紐付けたデッドレターキューにメッセージが移動

検証結果

  1. 送信Lambdaが通常キューにメッセージを送信
  2. 通常キューから受信Lambdaがポーリング
  3. 受信Lambdaがエラー
  4. 可視性タイムアウトの時間だけ「処理中のメッセージ」として待機
  5. SQSに紐付けたデッドレターキューにメッセージが移動

最大受信回数到達時、可視性タイムアウトだけ経過してからDLQに送られる

LambdaのDLQやOnFailureに指定したキューにいつ入るか?

検証した操作

  • 受信Lambdaの処理中にエラー
  • 受信Lambdaのタイムアウトエラー

いずれの場合も、LambdaのDLQやOnFailureで指定したキューに入ることはなかった。。。

よくよく調べると以下の結論に行き着いた。

SQSトリガーでのLambda実行は非同期呼び出しではないため、OnFailure/OnSuccessに指定したキューや、Lambdaで指定したDLQには入らない

公式ドキュメントにも記載あり。

Lambda はキューをポーリングし、Lambda 関数を、キューメッセージを含むイベントと共に同期的に呼び出します。

所感

似たような仕組みが色々あり、ややこしい。。。
公式ドキュメントってやっぱりちゃんと読まないとだめだなあ。

参考文献

4
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
2