Node.js
AWS
DynamoDB
lambda
DynamoDBStream

DynamoDB StreamをトリガーにしてLambdaを実行する

DynamoDB Streamとは

DynamoDB Streamは、DynamoDBに対する項目の追加、変更、削除をイベントとして検出できる機能です。DynamoDB Streamを使うことで、項目が追加・変更された時にプッシュ通知を飛ばしたりといった実装が可能になります。今回は、そのイベントをLambdaで受け取ってみたいと思います。

参考:DynamoDB ストリーム を使用したテーブルアクティビティのキャプチャ

DynamoDB側でStreamを有効化

先んず、DynamoDBの概要のタブにある「ストリームの管理」から、Streamを有効化します。新旧イメージを選択すると、項目の変更時にLambdaが項目のBefore&Afterの両方を受け取ることができます。
スクリーンショット 2018-02-14 20.52.33.png

DynamoDB Streamを受け取るためにLambdaに必要な権限

LambdaがDynamoDB Streamを受け取るためには、読み取りたいテーブルのストリームに対して下記の4つの権限を許可しておく必要があります。ストリームのARNは、arn:aws:dynamodb:ap-northeast-1:XXXXXXXXX:table/テーブル名/stream/*とすることで全てのストリームを受け取ることができます。

  • GetShardIterator
  • GetRecords
  • ListStream
  • DescribeStream

Lambda側からDynamoDB Streamのトリガーを追加

Lambdaのページの「左側からトリガーを追加」というところで、DynamoDBを選択すると設定画面が出てきます。
スクリーンショット 2018-02-14 21.00.39.png

設定ではStreamを受け取りたいテーブルとバッチサイズを指定します。開始位置は最新と水平を選択できます。ここでは最新を選択しています。左下の追加をクリックしたあとに変更を保存すると、トリガーとして追加されます。
スクリーンショット 2018-02-14 21.00.53.png

Lambda関数の中身の雛形

event.Recordsが配列としてイベントを受け取っているので、forEachで一つ一つ処理をしています。eventNameに「INSERT」か「MODIFY」か「REMOVE」が入っていて、変更の種類を教えてくれます。

index.js
exports.handler = (event, context, callback) => {

    event.Records.forEach((record) => {
        console.log('イベント種別:', record.eventName);
        console.log('DynamoDB Record: %j', record.dynamodb);

        if(record.eventName == 'INSERT'){
            //項目が追加された時の処理
            const newItem = record.dynamodb.NewImage;

        }else if(record.eventName == 'MODIFY'){
            //項目が変更された時の処理
            const oldItem = record.dynamodb.OldImage;//変更前
            const newItem = record.dynamodb.NewImage;//変更後

        }else if(record.eventName == 'REMOVE'){
            //項目が削除された時の処理
            const deletedItem = record.dynamodb.OldImage

        }else{

        }
    });

};

結果

DynamoDBのコンソール画面から適当に項目を追加、編集、削除してみましょう。下記のログは項目を編集(名前を山田太郎→佐藤太郎に変更)した時のログです。NewImageとOldImageの中にBefore&Afterが入っているのがわかると思います。SとかNとかはデータ型を表しています。

{
    "ApproximateCreationDateTime": 1518610740,
    "Keys": {
        "id": {
            "S": "sample"
        },
        "create_at": {
            "N": "1518534000"
        }
    },
    "NewImage": {
        "name": {
            "S": "佐藤太郎"
        },
        "id": {
            "S": "sample"
        },
        "create_at": {
            "N": "1518534000"
        }
    },
    "OldImage": {
        "name": {
            "S": "山田太郎"
        },
        "id": {
            "S": "sample"
        },
        "create_at": {
            "N": "1518534000"
        }
    },
    "SequenceNumber": "24000000000007967212728",
    "SizeBytes": 98,
    "StreamViewType": "NEW_AND_OLD_IMAGES"
}