7
5

More than 5 years have passed since last update.

AWS Lambdaのタイムアウト条件下でAmazon SQSの受信メッセージを繰り返し処理する

Last updated at Posted at 2016-12-29

狙いと概要

経緯

Amazon SQSで受信メッセージに対する処理をAWS Lambdaで(スケジューリングして)バッチ処理しようと思った時に、次の条件を考慮した実装をどのようにしようか考えていました。

  • 1度の受信で取得できる最大メッセージ数が10 [SQS]
  • メッセージ受信後の処理は、受信しては処理して明示的にメッセージを削除することが基本 [SQS]
  • 1回の実行のタイムアウトが最大5分[Lambda]

1回のLambdaの起動でSQSから1回受信して10メッセージ以下の処理をするようにしては、Lambdaが持て余す感じがしますし、Lambdaを再帰呼び出しする方法があるにしても、タイムアウトの兼ね合いがありますので、それもちょっと取りづらい選択肢でした。

対応

結果考えたのが、Lambdaの1回の実行におけるタイムアウト5分以内にSQSのメッセージ受信に紐つく処理を、繰り返せる分だけ繰り返そうという方法です。

十分にマージンを確保する意図で、設定したタイムアウトの1分前までを目処にメッセージ受信からの処理の繰り返しをストップしてreturnするように考えました。

実装

Node.jsコーディング

LambdaのランタイムはNode.js 4.3を利用します。コーディング例は次のような感じです。Promiseを使うと良い感じに書けると思います。また、Lambdaの設定タイムアウトは環境変数を設定して process.env.TIMEOUT から取得するようにしました。

"use strict"

const aws = require('aws-sdk')

// SQSの処理
const sqs = {
  // メッセージ受信(ロングポール)
  receiveMessages: (url) => {
    let params = {
      QueueUrl: url,
      MaxNumberOfMessages: 10,
      WaitTimeSeconds: 10,
      VisibilityTimeout: 60
    }
    const sqs = new aws.SQS()
    return sqs.receiveMessage(params).promise()
  },
  // メッセージ削除
  deleteMessages: (url, array) => {
    let params = {
      Entries: array,
      QueueUrl: url
    }
    const sqs = new aws.SQS()
    return sqs.deleteMessageBatch(params).promise()
  },
  // 繰り返し処理
  iterativeExe: (url, fn, _startTime) => {
    // 開始時間を取得
    let startTime = _startTime || new Date().getTime()
    // メッセージ受信
    return sqs.receiveMessages(url).then(function(data) {
      // メッセージがない場合は終了
      if (!data.Messages) {
        console.log('[log] Process ends because of no cancel messages from SQS.');
        return Promise.resolve({
          noMessage: true
        })
      }
      // それぞれのメッセージに対する処理をPromise, reduceによる繰り返しで実行
      return data.Messages.reduce((promise, message) => {
        return promise.then((result) => {
          return fn(message)
        });
      }, Promise.resolve({}))
    }).then((r) => {
      // メッセージの残りと経過時間をチェックして、繰り返すかreturnする
      if (!r.noMessage) {
        let time = (new Date().getTime() - startTime) / 1000 // [sec]
        let timeout = Number(process.env.TIMEOUT) - 60
        if (time <= timeout) {
          console.log('[log] Passed time [s] -> ', time)
          return sqs.iterativeExe(url, fn, startTime)
        } else {
          console.log('[log] Passed time [s] -> ', time)
          console.log('[log] Execution time is over.')
          return Promise.resolve({});
        }
      }
      return Promise.resolve({});
    })
  }
}

// Lambda関数実行部分
exports.handler = (event, context, callback) => {
  const url = 'https://sqs.ap-northeast-1.amazonaws.com/111111111111/iteration' // SQSのQueueUrl
  sqs.iterativeExe(url, (message) => {
    // メッセージ削除の例
    let delArray = [{
      Id: message.MessageId,
      ReceiptHandle: message.ReceiptHandle
    }]
    return sqs.deleteMessages(url, delArray)
  }).then((r) => {
    console.log(r)
    callback(null, 'Process completed.')
  }).catch((e) => {
    console.log(e)
    callback('Error occurred.')
  })
}

sqs.iterativeExe() という関数にして一般化しています。

sqs.iterativeExe(url, (message) => {
    //
    // messageに対する処理を記述してPromiseをreturnしておく
    //
})

今回は、いきなりメッセージを削除する処理を例にしていますが、実際には message に対する処理を行って、最後に message を削除するようにPromiseチェーンを組むと良いでしょう。

Lambdaの設定

  • この処理の特性上Lambdaの起動周期は5分以上に設定しましょう
  • process.env.TIMEOUT を利用できるよう、Lambdaの設定タイムアウトに合わせて環境変数を設定しておきましょう(秒単位で設定)。 スクリーンショット 2016-12-30 21.41.16.png

まとめ

この方法がいいところは、一般化された sqs.iterativeExe() の中でそれぞれの message に対して行いたい処理をPromiseを返却する(関数の)形で記述するだけで、Lambdaの実行時間やメッセージ数との兼ね合いと気にせずに、受信メッセージに対する処理を実行できてしまうところです。もっと上手い方法があるかもしれませんが、ひとまず良しとさせてもらいたいと思います。

7
5
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
7
5