目的
AWS SQSのイベントハンドラーとしてAWS LambdaのRubyを使ってみます。タイムアウトなどでいくつかはまりポイントがありました。デプロイにはServerlessFrameworkを使っています。
SQS
キューのタイプ
標準キー
順番の保証が無く、配信も複数行われる可能性があります。1秒あたりトランザクション数は無制限
fifo
順番の保証があり、配信は1つに制限されます。1秒あたりのトランザクション数は300
今回の選択
メッセージを重複処理されては困るのでfifoにしました。また本番で試したコードはDynamoDBと連携していたのでトランザクション数が制限されていた方がありがたかったです。
可視性タイムアウト
この時間を超えてしまうと、同じデータで別の処理が動いてしまいます。
デフォルトは30秒でこれはLambdaのタイムアウトと合わせる必要があります。もっと長くすることも可能です。最大12時間。
メッセージ受信待機時間
デフォルト20秒(最大値)。SQSはポーリングすると課金されていきます。なるべく長い方が良いです。Lambdaで利用する場合はポーリングは裏で勝手にやってくれるので、気にしなくてよいです。
AWS Lambda(SQSのハンドラー用の設定)
batchSize
バッチサイズはキューから一度に取ってくるデータのサイズ
バッチサイズは今回はfifoの最大バッチサイズである10
標準キーでは10000が最大サイズ
maximumBatchingWindow
バッチサイズがたまるまで待つ時間。今回は使用しませんでした。
functionResponseType
バッチでとってきた時にエラーを個別に返すかどうかの設定です。エラーとして返せば再びSQSのキューから配信されます。
コード
serverless frameworkの設定
service: sample
frameworkVersion: '2'
provider:
name: aws
runtime: ruby2.7
timeout: 30
functions:
oauth:
handler: handler.index
events:
- sqs:
arn: "arn:aws:sqs:us-east-1:1111111111:sample.fifo"
batchSize: 10
functionResponseType: ReportBatchItemFailures
Lambdaに届くSQSのイベントデータ。AWSのサイトからコピーしてきました。ここではbodyにjsonを登録して利用します。
{
"Records": [
{
"messageId": "059f36b4-87a3-44ab-83d2-661975830a7d",
"receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...",
"body": "Test message.",
"attributes": {
"ApproximateReceiveCount": "1",
"SentTimestamp": "1545082649183",
"SenderId": "AIDAIENQZJOLO23YVJ4VO",
"ApproximateFirstReceiveTimestamp": "1545082649185"
},
"messageAttributes": {},
"md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
"eventSource": "aws:sqs",
"eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue",
"awsRegion": "us-east-2"
},
{
"messageId": "2e1424d4-f796-459a-8184-9c92662be6da",
"receiptHandle": "AQEBzWwaftRI0KuVm4tP+/7q1rGgNqicHq...",
"body": "Test message.",
"attributes": {
"ApproximateReceiveCount": "1",
"SentTimestamp": "1545082650636",
"SenderId": "AIDAIENQZJOLO23YVJ4VO",
"ApproximateFirstReceiveTimestamp": "1545082650649"
},
"messageAttributes": {},
"md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
"eventSource": "aws:sqs",
"eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue",
"awsRegion": "us-east-2"
}
]
}
lambdaのコード
require 'json'
def execute(record)
body = JSON.parse(record['body'])
# 処理...
end
def index(event:, context:)
batchItemFailures = event['Records'].map do |record|
begin
execute(record)
nil
rescue => e
{itemIdentifier: record['messageId']}
end
end
{batchItemFailures: batchItemFailures}
end
おまけ(マルチスレッド版)
require 'json'
require 'parallel'
def execute(record)
body = JSON.parse(record['body'])
# 処理...
end
def index(event:, context:)
batchItemFailures = Parallel.map(event['Records'], in_threads: 10) do |record|
begin
execute(record)
nil
rescue => e
{itemIdentifier: record['messageId']}
end
end
{batchItemFailures: batchItemFailures}
end