AWS Lambdaの無限ループパターンで何かできないかなと考えてたら思いついたので試してみた話。
構成
KinesisのストリームからAWS Lambdaを起動して、処理が完了したら再度AWS Kinesisにプッシュすることで無限ループを行います。
この無限ループ中でSQSからメッセージを受け取り、メッセージ処理用のAWS Lambdaを起動して、メッセージを処理できたらSQSからメッセージを削除するという動きになります。
もし、失敗した場合はメッセージがそのままSQSに残るので、Dead Letter Queueに入るまでは何度でもリトライ可能になります。
コード
// require
var Promise = require('bluebird');
var AWS = require('aws-sdk');
var config = require('config');
var uuid = require('node-uuid');
var sqs = Promise.promisifyAll(new AWS.SQS());
var lambda = Promise.promisifyAll(new AWS.Lambda());
var kinesis = Promise.promisifyAll(new AWS.Kinesis());
/**
* AWS Lambda entry point
*
* @param {Object} event lambda event object
* @param {Object} context lambda context
*/
exports.handler = function(event, context) {
//
Promise.resolve().then(function() {
// receive messages from SQS
var params = {
QueueUrl: config.get('sqs.url'),
MaxNumberOfMessages: config.get('sqs.max_number_of_messages'),
VisibilityTimeout: config.get('sqs.visiblity_timeout'),
WaitTimeSeconds: config.get('sqs.wait_time_seconds')
};
return sqs.receiveMessageAsync(params).then(function(data) {
return data.Messages || [];
});
}).each(function(data) {
// invoke lambda
var args = {Records: [
{
sqs: {
QueueUrl: config.get('sqs.url'),
MessageId: data.MessageId,
ReceiptHandle: data.ReceiptHandle,
MD5OfBody: data.MD5OfBody,
Body: data.Body
}
}
]};
var params = {
FunctionName: config.get('lambda.function_name'),
InvokeArgs: JSON.stringify(args)
};
return lambda.invokeAsyncAsync(params);
}).then(function() {
// put to kinesis stream for retry
var params = {
Records: [
{
PartitionKey: uuid.v4(),
Data: ''
}
],
StreamName: config.get('kinesis.stream_name')
};
return kinesis.putRecordsAsync(params);
}).then(function() {
context.done(null, 'success consumer.');
}).catch(function(err) {
context.done(err);
});
};
コード自体も簡単で、SQSからメッセージを受けとって、AWS Lambdaを起動して、Kinesisnに再プッシュしているだけになります。
ネットワーク障害とかで処理に失敗した場合はAWS Lambda自体のリトライ機能を使って、再実行を行います。
Kinesisをイベントソースにしているためリトライが10回できるので、それで十分かなという感じです。
あとはCloudWatchとかでSQSのMessagesVisibleを見て閾値を超えてたらKinesisに2回プッシュするとかすると、SQSの詰まり具合に合わせてスケールできて良いかもですね。
停止方法
AWS Lambda functionを削除するかKinesisのストリームを削除したら止まります。
感想
パターンとしてはおもしろいですが、Kinesis-Lambdaで安定的に無限ループさせるのが難しそうな雰囲気です。
監視とかパフォーマンスとかスケールとか考えだすと1年間安定運用させる自信がないです。
ちょっと誰か実運用に載せて感想文書いて欲しいです。