3
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

目的

AWS SQSのイベントハンドラーとしてAWS LambdaのRubyを使ってみます。タイムアウトなどでいくつかはまりポイントがありました。デプロイにはServerlessFrameworkを使っています。

SQS

キューのタイプ

標準キー

順番の保証が無く、配信も複数行われる可能性があります。1秒あたりトランザクション数は無制限

fifo

順番の保証があり、配信は1つに制限されます。1秒あたりのトランザクション数は300

今回の選択

メッセージを重複処理されては困るのでfifoにしました。また本番で試したコードはDynamoDBと連携していたのでトランザクション数が制限されていた方がありがたかったです。

可視性タイムアウト

この時間を超えてしまうと、同じデータで別の処理が動いてしまいます。
デフォルトは30秒でこれはLambdaのタイムアウトと合わせる必要があります。もっと長くすることも可能です。最大12時間。

メッセージ受信待機時間

デフォルト20秒(最大値)。SQSはポーリングすると課金されていきます。なるべく長い方が良いです。Lambdaで利用する場合はポーリングは裏で勝手にやってくれるので、気にしなくてよいです。

AWS Lambda(SQSのハンドラー用の設定)

batchSize

バッチサイズはキューから一度に取ってくるデータのサイズ
バッチサイズは今回はfifoの最大バッチサイズである10
標準キーでは10000が最大サイズ

maximumBatchingWindow

バッチサイズがたまるまで待つ時間。今回は使用しませんでした。

functionResponseType

バッチでとってきた時にエラーを個別に返すかどうかの設定です。エラーとして返せば再びSQSのキューから配信されます。

コード

serverless frameworkの設定

serverless.yml
service: sample

frameworkVersion: '2'

provider:
  name: aws
  runtime: ruby2.7
  timeout: 30

functions:
  oauth:
    handler: handler.index
    events:
      - sqs:
          arn: "arn:aws:sqs:us-east-1:1111111111:sample.fifo"
          batchSize: 10
          functionResponseType: ReportBatchItemFailures

Lambdaに届くSQSのイベントデータ。AWSのサイトからコピーしてきました。ここではbodyにjsonを登録して利用します。

{
    "Records": [
        {
            "messageId": "059f36b4-87a3-44ab-83d2-661975830a7d",
            "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...",
            "body": "Test message.",
            "attributes": {
                "ApproximateReceiveCount": "1",
                "SentTimestamp": "1545082649183",
                "SenderId": "AIDAIENQZJOLO23YVJ4VO",
                "ApproximateFirstReceiveTimestamp": "1545082649185"
            },
            "messageAttributes": {},
            "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
            "eventSource": "aws:sqs",
            "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue",
            "awsRegion": "us-east-2"
        },
        {
            "messageId": "2e1424d4-f796-459a-8184-9c92662be6da",
            "receiptHandle": "AQEBzWwaftRI0KuVm4tP+/7q1rGgNqicHq...",
            "body": "Test message.",
            "attributes": {
                "ApproximateReceiveCount": "1",
                "SentTimestamp": "1545082650636",
                "SenderId": "AIDAIENQZJOLO23YVJ4VO",
                "ApproximateFirstReceiveTimestamp": "1545082650649"
            },
            "messageAttributes": {},
            "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
            "eventSource": "aws:sqs",
            "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue",
            "awsRegion": "us-east-2"
        }
    ]
}

lambdaのコード

handler.rb
require 'json'

def execute(record)
  body = JSON.parse(record['body'])
  # 処理...
end

def index(event:, context:)
  batchItemFailures = event['Records'].map do |record|
    begin
      execute(record)
      nil
    rescue => e
      {itemIdentifier: record['messageId']}
    end
  end
  {batchItemFailures: batchItemFailures}
end

おまけ(マルチスレッド版)

handler2.rb
require 'json'
require 'parallel'

def execute(record)
  body = JSON.parse(record['body'])
  # 処理...
end

def index(event:, context:)
  batchItemFailures = Parallel.map(event['Records'], in_threads: 10) do |record|
    begin
      execute(record)
      nil
    rescue => e
      {itemIdentifier: record['messageId']}
    end
  end
  {batchItemFailures: batchItemFailures}
end
3
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?