Help us understand the problem. What is going on with this article?

【覚書】SQSイベント駆動でLambdaを動かす

やりたいこと

  • S3にデータを配置し、配置されたデータのイベントをSQSにpushする
  • SQSにenqueueされたイベントから、Lambdaをinvokeする
  • invokeされたLambdaで、S3に配置されたファイルを処理する

仕様の再確認

S3への権限付与

  • S3バケットにSQSへのイベント発行権限を与える。SQSのアクセス許可にプリンシパルとして加える。
{
  "Version": "2012-10-17",
  "Id": "arn:aws:sqs:us-west-2:101619051410:PracticeQueue/SQSDefaultPolicy",
  "Statement": [
    {
      "Sid": "Sid1234567890",
      "Effect": "Allow",
      "Principal": "*",
      "Action": "SQS:SendMessage",
      "Resource": "arn:aws:sqs:us-west-2:1234567890:PracticeQueue",
      "Condition": {
        "StringEquals": {
          "aws:SourceArn": "arn:aws:s3:::your-bucket-name"
        }
      }
    }
  ]
}

Lambdaの挙動

  • LambdaはSQSキューをポーリングしている
    • キューでメッセージが利用可能になると、Lambdaは即座にメッセージを5バッチまで取ってしまうので注意
  • キューのメッセージを含むイベントで、関数を同期実行する
    • 同期実行なので、DLQの設定は不要
    • エラー時の再実行については、基本的にSQS側で扱う
  • メッセージは「バッチ」と呼ばれるひとまとまりで取得される
    • バッチサイズ(1つのバッチに含まれるメッセージの数)は、イベントソースマッピングを行う時に指定可能
    • 最大バッチサイズは10、最小バッチサイズは1
  • Lambdaは1回のポーリングで5バッチまでメッセージを取得し、Lambda関数を起動する

バッチのサンプル

{
    "Records": [
        {
            "messageId": "059f36b4-87a3-44ab-83d2-661975830a7d",
            "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...",
            "body": "test",
            "attributes": {
                "ApproximateReceiveCount": "1",
                "SentTimestamp": "1545082649183",
                "SenderId": "AIDAIENQZJOLO23YVJ4VO",
                "ApproximateFirstReceiveTimestamp": "1545082649185"
            },
            "messageAttributes": {},
            "md5OfBody": "098f6bcd4621d373cade4e832627b4f6",
            "eventSource": "aws:sqs",
            "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue",
            "awsRegion": "us-east-2"
        },
        {
            "messageId": "2e1424d4-f796-459a-8184-9c92662be6da",
            "receiptHandle": "AQEBzWwaftRI0KuVm4tP+/7q1rGgNqicHq...",
            "body": "test",
            "attributes": {
                "ApproximateReceiveCount": "1",
                "SentTimestamp": "1545082650636",
                "SenderId": "AIDAIENQZJOLO23YVJ4VO",
                "ApproximateFirstReceiveTimestamp": "1545082650649"
            },
            "messageAttributes": {},
            "md5OfBody": "098f6bcd4621d373cade4e832627b4f6",
            "eventSource": "aws:sqs",
            "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue",
            "awsRegion": "us-east-2"
        }
    ]
}
  • 1分あたり最大60インスタンスまでバッチを読み込みできる
  • イベントソースマッピングで並列処理できるバッチの最大数は1000
  • メッセージにバッチが読み込まれると、メッセージは可視性タイムアウトに入る
    • 関数が正常にバッチを処理すると、そのメッセージをキューから削除
    • 関数が正常に終了しなかった場合は、可視性タイムアウトの後、再びメッセージが表示される
      • スロットリング
      • エラー
      • タイムアウトなど
  • 1回のポーリングで5バッチのメッセージを取得し、関数を実行する
  • ソースキューの可視性タイムアウトをLambda関数のタイムアウトの最低6倍とっておけば、Lambda関数のタイミングでキューを処理できる
    • スロットリングによってLambda関数が実行出来なかった場合、次のバッチ実行までにタイムアウト1回分の猶予がとられるらしい
      • これによって、最低でも1つ分のスロットが空いて処理が走れるようになる
  • キューの再処理ポリシーでmaxReceiveCount(メッセージ再処理回数)5に設定すれば、Lambda関数の同時実行数が1であった場合でも、同時に5個キューイングされたメッセージが、スロットリングによって喪失することはない

SQSの設定

こんな感じにするヨロシ

    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": [
                "sqs:DeleteMessage",
                "sqs:ReceiveMessage",
                "sqs:GetQueueAttributes"
            ],
            "Resource": [
                "arn:aws:sqs:us-west-2:123456789102:QueueA",
                "arn:aws:sqs:us-west-2:123456789102:QueueB"
            ]
        }
    ]
}
  • DLQは必ず設定すること
    • その後のエラーハンドリングも設計しておく
  • 可視性タイムアウトの時間と、maxReceiveCount(最大処理回数)の設定に気をつけること

パラメータの決め方

考えねばならないパラメータ

  • Lambda関数の最大同時実行数
    • 並列して走らせるLambda関数の数
    • これを超えるとスロットリングが発生し、キューの処理が遅延する
  • Lambda関数のタイムアウト時間
    • これはそのままの意味
    • 最大15分
  • SQSキューの可視性タイムアウトの時間
    • このアーキテクチャに関してはこのパラメータが、メッセージの処理がエラーになった場合のリトライまでの待機時間に相当する
      • 可視性タイムアウトを抜けて利用可能になったメッセージは、即座にポーリングされてLambda関数で走る・・・というところがポイント
    • スロットリングしてしまった場合には、これにLambda関数のタイムアウト時間分が追加される
      • これのおかげで、スロットリングによる再実行の回数を少なく抑えられるしくみ
  • SQSキューの最大再処理回数
    • 何回リトライを許容するかを決めるパラメータ

決め方

  • 並列処理数上限
    • DBに接続したりする場合はコネクション数の上限とか、相手先の負荷次第
    • 5以上にしておくと、大量のジョブを同時に受けた時に、ポーリングの都度スロットリングされなくて済む
  • 実行時間上限
    • 処理の内容見合い
    • スロットリング時、現状の処理が終了しているか否かに関わらず、タイムアウト1回分追加の待ち時間が生じるので、不必要に実行時間を長くし過ぎると、スロットリング発生時に処理が極端に滞る
  • リトライの頻度
    • 平均実行時間、並列処理数上限などを合わせて考える
    • 平均的にこれくらいの時間で終わるよな・・・という辺りの可視性タイムアウトにしておけば、まずまず処理が進んでくれるはず
  • リトライ回数の上限
    • 何回リトライしてDLQに落とすか決める
    • リトライの頻度等と合わせて考える

まとめ

  • LambdaはSQSキューをポーリングしている
    • キューでメッセージが利用可能になると、Lambdaは即座にメッセージを取得する
    • 同時に5バッチ(1バッチ=1〜10メッセージ)取得され、それぞれにLambda関数が起動する
  • ジョブの管理等(リトライ・エラーハンドリング等)はSQS側で制御する
    • 最大再処理回数
    • 可視性タイムアウト
    • DLQ
  • スロットリングが発生すると、メッセージはエラーとしてキューに残る
    • 次のポーリングは、Lambda関数のタイムアウト1回分後に再開される
    • 取得されたメッセージは可視性タイムアウトの時間を過ぎると、再取得可能になる
      • つまり、この頻度でリトライを繰り返す
    • 処理が正常終了したら、メッセージは削除される
  • メッセージのリトライ回数が少なすぎると、DLQに落ちるメッセージが増えてしまうので注意
Why do not you register as a user and use Qiita more conveniently?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away