DynamoDB Stream から RDSに登録する処理を書いていたところ、
微妙に件数にズレが出ていることが発覚した。
確率としては10000件に10件程度レコード抜けが発生してしまう。。
その時の環境は以下のとおり。
DynamoDB Stream周りの設定
項目 | 値 |
---|---|
Streamのレコード設定 | 「新旧レコード」をストリームに流す |
Batch Size | 100 |
Starting position | Trim horizon |
Lambdaファンクション(抜粋)
// ..snip
exports.handler = function(event, context){
// レコードごとの処理
event.Records.forEach(function(record) {
// 何かの処理
hogehogefunc(function(err, data) {
if (err) {
console.error(err)
context.fail(err)
} else {
// 何かの処理
fugafugafunc(function(err, data) {
if (err) {
console.error(err)
context.fail(err)
} else {
context.succeed("Successfully processed " + record_length + " records.");
}
});
}
});
});
};
原因
検証してもよくわからなかったのでサポートに聞いてみたところ
「きちんと全部処理してからsucceedしてね」
とアドバイスいただいた。
おそらくバッチサイズを1以上にして、複数レコードを返ってきた時に
現状のコードだと処理漏れしてしまうことがある?様子。
改善後のコード
// ..snip
exports.handler = function(event, context){
// レコード数保持しとく
var record_length = event.Records.length
var proc_count = 0
// 1レコードごとの終了処理
function finish_handler(err) {
proc_count++
if(err) {
// エラー出たら終了しちゃう
// 場合によっては処理続行して、レコードの一番最後で判定したほうが良いかも
console.error(err)
context.fail(err)
} else if(proc_count == record_length) {
// 全レコードを処理したらイベント終了とする
context.succeed("Successfully processed " + record_length + " records.");
}
}
// レコードごとの処理
event.Records.forEach(function(record) {
// 何かの処理
hogehogefunc(function(err, data) {
if (err) {
finish_handler(err)
} else {
// 何かの処理
fugafugafunc(function(err, data) {
finish_handler(err)
});
}
});
});
};
こちらのコードに置き換えたところ処理漏れはなくなりました :)
漏れなく処理したい時にはきちんと全レコード判定する処理を書きましょう〜、というお話でした。