狙いと概要
経緯
Amazon SQSで受信メッセージに対する処理をAWS Lambdaで(スケジューリングして)バッチ処理しようと思った時に、次の条件を考慮した実装をどのようにしようか考えていました。
- 1度の受信で取得できる最大メッセージ数が10 [SQS]
- メッセージ受信後の処理は、受信しては処理して明示的にメッセージを削除することが基本 [SQS]
- 1回の実行のタイムアウトが最大5分[Lambda]
1回のLambdaの起動でSQSから1回受信して10メッセージ以下の処理をするようにしては、Lambdaが持て余す感じがしますし、Lambdaを再帰呼び出しする方法があるにしても、タイムアウトの兼ね合いがありますので、それもちょっと取りづらい選択肢でした。
対応
結果考えたのが、Lambdaの1回の実行におけるタイムアウト5分以内にSQSのメッセージ受信に紐つく処理を、繰り返せる分だけ繰り返そうという方法です。
十分にマージンを確保する意図で、設定したタイムアウトの1分前までを目処にメッセージ受信からの処理の繰り返しをストップしてreturnするように考えました。
実装
Node.jsコーディング
LambdaのランタイムはNode.js 4.3を利用します。コーディング例は次のような感じです。Promiseを使うと良い感じに書けると思います。また、Lambdaの設定タイムアウトは環境変数を設定して process.env.TIMEOUT から取得するようにしました。
"use strict"
const aws = require('aws-sdk')
// SQSの処理
const sqs = {
// メッセージ受信(ロングポール)
receiveMessages: (url) => {
let params = {
QueueUrl: url,
MaxNumberOfMessages: 10,
WaitTimeSeconds: 10,
VisibilityTimeout: 60
}
const sqs = new aws.SQS()
return sqs.receiveMessage(params).promise()
},
// メッセージ削除
deleteMessages: (url, array) => {
let params = {
Entries: array,
QueueUrl: url
}
const sqs = new aws.SQS()
return sqs.deleteMessageBatch(params).promise()
},
// 繰り返し処理
iterativeExe: (url, fn, _startTime) => {
// 開始時間を取得
let startTime = _startTime || new Date().getTime()
// メッセージ受信
return sqs.receiveMessages(url).then(function(data) {
// メッセージがない場合は終了
if (!data.Messages) {
console.log('[log] Process ends because of no cancel messages from SQS.');
return Promise.resolve({
noMessage: true
})
}
// それぞれのメッセージに対する処理をPromise, reduceによる繰り返しで実行
return data.Messages.reduce((promise, message) => {
return promise.then((result) => {
return fn(message)
});
}, Promise.resolve({}))
}).then((r) => {
// メッセージの残りと経過時間をチェックして、繰り返すかreturnする
if (!r.noMessage) {
let time = (new Date().getTime() - startTime) / 1000 // [sec]
let timeout = Number(process.env.TIMEOUT) - 60
if (time <= timeout) {
console.log('[log] Passed time [s] -> ', time)
return sqs.iterativeExe(url, fn, startTime)
} else {
console.log('[log] Passed time [s] -> ', time)
console.log('[log] Execution time is over.')
return Promise.resolve({});
}
}
return Promise.resolve({});
})
}
}
// Lambda関数実行部分
exports.handler = (event, context, callback) => {
const url = 'https://sqs.ap-northeast-1.amazonaws.com/111111111111/iteration' // SQSのQueueUrl
sqs.iterativeExe(url, (message) => {
// メッセージ削除の例
let delArray = [{
Id: message.MessageId,
ReceiptHandle: message.ReceiptHandle
}]
return sqs.deleteMessages(url, delArray)
}).then((r) => {
console.log(r)
callback(null, 'Process completed.')
}).catch((e) => {
console.log(e)
callback('Error occurred.')
})
}
sqs.iterativeExe() という関数にして一般化しています。
sqs.iterativeExe(url, (message) => {
//
// messageに対する処理を記述してPromiseをreturnしておく
//
})
今回は、いきなりメッセージを削除する処理を例にしていますが、実際には message に対する処理を行って、最後に message を削除するようにPromiseチェーンを組むと良いでしょう。
Lambdaの設定
- この処理の特性上Lambdaの起動周期は5分以上に設定しましょう
-
process.env.TIMEOUTを利用できるよう、Lambdaの設定タイムアウトに合わせて環境変数を設定しておきましょう(秒単位で設定)。
まとめ
この方法がいいところは、一般化された sqs.iterativeExe() の中でそれぞれの message に対して行いたい処理をPromiseを返却する(関数の)形で記述するだけで、Lambdaの実行時間やメッセージ数との兼ね合いと気にせずに、受信メッセージに対する処理を実行できてしまうところです。もっと上手い方法があるかもしれませんが、ひとまず良しとさせてもらいたいと思います。