LambdaのSQSトリガーってありそうで、なかったのですが、本日できるようになったそうです。
AWS Lambda Adds Amazon Simple Queue Service to Supported Event Sources
AWS LambdaがSQSをイベントソースとしてサポートしました!
ちょっと気になる&前に作った機能置き換えられるんじゃない?ってことで、ちょっと試してみました。
前提条件としては、FIFOには対応していないらしいですが、
相変わらず東京リージョンにはないので、ここは問題ないです。
バッチサイズ(メッセージの最大取得数)は10のようです。
検証準備
とりあえずトリガーにするSQSのキューを作成します。
可視性タイムアウトは、デフォルト値(30秒)にしています。
起動するLambdaは最近はNode.js使いなので、Node.js(v8.10)で書いてみます。
こちら、メッセージ内の文字列で処理分けている以外は、特に何もしていません。
exports.handler = async (event, context, callback) => {
console.log('Received event:', JSON.stringify(event, null, 2));
let body = event.Records[0].body;
if ( body.match(/Wait/)) {
// メッセージBodyに「Wait」が含まれていたら、40秒待機(lambdaがタイムアウトします。)
console.log("Wait.");
setTimeout(callback, 40000);
callback(null, 'Wait -> Success.'); // ここ出ないんですけどね。
} else if ( body.match(/Error/)) {
// メッセージBodyに「Error」が含まれていたら、エラーで終了
console.log("Error");
callback('Error.')
} else {
// それ以外は正常終了
console.log('Success.');
callback(null, 'Success.');
}
};
そのまま動くと思いますが、エラーの場合、ずっと起動し続けますので、キューからメッセージ削除するなどしてください。
このLambdaで使う実行ロールには、SQSのフルアクセス権限つけています。
最初、Lambdaのタイムアウトを60秒にしていたのですが、
保存しようとしたところ、エラーになりました。
Lambdaのタイムアウトは、可視性タイムアウト以下にしないとダメそうです。
確かに、Lambdaの処理中に、可視性タイムアウトが切れたら、再度Lambdaが実行されることになるので、そこは納得できます。
(ちなみに、イベントソースから削除しようとしても、Lambdaのタイムアウト値が長いと、このエラー出るんですよね。。。)
検証
とりあえず検証なので、SQSのコンソール上で、メッセージを送信し、
Lambdaが実行されること、
Lambdaの正常終了時、エラー時、Lambdaタイムアウト時の挙動を確認しました。
- 正常終了時はキューからメッセージが消える
- エラー時、Lambdaタイムアウト時は、キューにメッセージは残り、不活性タイムアウトが切れたら、再度Lambdaが実行される
エラー時の実行ログです。可視性タイムアウト切れ後(設定は30秒)に再度実行されていることがわかります。
13:25:48 START RequestId: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx Version: $LATEST
13:25:48 2018-06-29T04:25:48.620Z xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx Received event:
{
"Records": [
{
"messageId": "zzzzzzzz-zzzz-zzzz-zzzz-zzzzzzzzzzzz",
"receiptHandle": "",
"body": "test. Error",
・・・
}
]
}
13:25:48 2018-06-29T04:25:48.639Z xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx Error
13:25:48 2018-06-29T04:25:48.640Z xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
{
"errorMessage": "Error."
}
13:25:48 END RequestId: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
13:25:48 REPORT RequestId: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx Duration: 55.12 ms Billed Duration: 100 ms Memory Size: 128 MB Max Memory Used: 20 MB
13:26:18 START RequestId: yyyyyyyy-yyyy-yyyy-yyyy-yyyyyyyyyyyy Version: $LATEST
13:26:18 2018-06-29T04:26:18.149Z yyyyyyyy-yyyy-yyyy-yyyy-yyyyyyyyyyyy Received event:
{
"Records": [
{
"messageId": "zzzzzzzz-zzzz-zzzz-zzzz-zzzzzzzzzzzz",
"receiptHandle": "",
"body": "test. Error",
・・・
}
]
}
13:26:18 2018-06-29T04:26:18.159Z yyyyyyyy-yyyy-yyyy-yyyy-yyyyyyyyyyyy Error
2018-06-29T04:26:18.159Z yyyyyyyy-yyyy-yyyy-yyyy-yyyyyyyyyyyy
{
"errorMessage": "Error."
}
13:26:18 END RequestId: yyyyyyyy-yyyy-yyyy-yyyy-yyyyyyyyyyyy
13:26:18 REPORT RequestId: yyyyyyyy-yyyy-yyyy-yyyy-yyyyyyyyyyyy Duration: 10.81 ms Billed Duration: 100 ms Memory Size: 128 MB Max Memory Used: 20 MB
今の所、
イベント発火しないとか、
1メッセージに対して、1つ以上のLambdaが実行される、
メッセージが削除されない等の事象は起きていないですが、
この辺の考慮は実装しておいたほうがいいのかもしれません。
receiptHandleがeventオブジェクト内にあるので、処理が終わったら、
receiptHandle使って明示的にキューからメッセージを削除ってこともできるかとおもいます。
(以下追記)
これ無限にリトライし続けるのかな?その辺制御できないのかなーと聞かれたので、
ついでに確認。
これは、SQSの再処理ポリシーを設定することで、リトライ回数の制御ができるそうです。
再処理ポリシーを設定すると、
最大受信数(maxReceiveCount)までリトライを繰り返すと、メッセージは「Dead Letter Queue」(DLQ)として指定したキューに移動するので、以後リトライすることはなくなります。
Amazon SQS デッドレターキュー
(ちょっとSQSの勉強にもなりました)
DLQに指定したキューの方でLambdaを実行して、Slackに通知するとかもできるかもしれませんね。
まとめ
前に、Lambdaを5分間隔でポーリング実行して、SQSのキューにメッセージがあったら、処理をするみたいな関数を作ったことがあるのですが、ポーリング処理をなくすことができるんじゃないかと思いました。
実はキューから取得するのが、ちょっと面倒で、SQSに苦手意識があったんですが、この機能のおかげで、苦手意識なくなりそうですw
4/2 追記
LambdaのBlackBeltをみていて、気づいたので修正。
ポーリングベースで、ストリームベースではないイベントソースかつ同期処理なので、
- イベント発火しない
- 1メッセージに対して、1つ以上のLambdaが実行される
は発生しない。