はじめに
DynamoDB Streamsを初めて使用したので、設定方法をまとめておきます。
使用した理由
DynamoDBのテーブルに保存されたデータを使用した構築済みの仕組みがあり、ここに同じデータから別の仕組みを一時的に追加する必要がありました。
追加した構成
DynamoDBのテーブルにDynamoDB Streamsを設定し、テーブルに追加されるデータを別処理にストリーミングさせる構成としました。既存の構成では、テーブルの設定を変更するだけで、何も影響がありません。パフォーマンスへの影響もありません。
実際の設定
実際に設定をしてみます。
DynamoDB側
コンソール > DynamoDB > テーブル > 設定対象のテーブル とすすみます。
「エクスポートおよびストリーム」タブの「DynamoDB ストリームの詳細」で機能をオンにします。
ストリームしたいのは追加データだけなので、新しいイメージとしました。
Lambdaの設定
Lambdaトリガーが起動されると、ストリームされたDynamoDBデータレコードをログに出力するようにコードを作成します。
import json
import logging
import boto3
# ロガーの設定
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# CloudWatch Logs クライアントの初期化
cloudwatch_logs = boto3.client('logs')
def lambda_handler(event, context):
# TODO implement
logger.info(json.dumps(event))
return {
'statusCode': 200,
'body': json.dumps('Hello from Lambda!')
}
Lambda関数がDynamoDBのデータレコードを読み込むための権限設定を行います。
※画面はFUllAccessにしていますが、実際に行う場合は、必要最小限の権限にしてください。
フィルターの設定
ここまでで、DynamoDBのテーブルに追加、変更、削除されたデータはすべて連携されるようになります。
今回必要なのは追加されたデータで、かつ、特定のフィールドの値を条件としたいので、フィルターを設定します。
{
"eventName": ["INSERT"],
"dynamodb": {
"NewImage": {
"field_moji": { "S": ["test1"] },
"field_moji2": { "S": [{"prefix": "{\"c1\": \"test2\","}] }
}
}
}
条件の説明:
以下の3つの条件が一致したレコードだけが、Lambda関数にストリームされます。
- レコードの種類が「追加」
- フィールド「field_moji」の値が、「test1」
- フィールド「field_moji2」の値が、「{"c1": "test2",」で始まる。(上記の条件内にはエスケープ文字が含まれます)
結果
ログに出力されたのは3つの条件を満たす Key1の値が「123457」のレコードだけとなりましたので、狙ったフィルターができていることがわかりました。
おまけ
AWSの公式では、フィルタリング条件に「で終わる」と「ワイルドカード」の記載がありません。
EventBridgeでは、「で終わる」に相当するサフィックスマッチングや「ワイルドカード」が使用できるようなので、DynamoDB Streamsで使用できるか試してみました。
「で終わる」
設定可能でした。フィルタリングも動作しました。
{
"eventName": ["INSERT"],
"dynamodb": {
"NewImage": {
"field_moji": { "S": ["test1"] },
"field_moji2": { "S": [{"suffix": "end111"}] }
}
}
}
「ワイルドカード」
設定不可でした。何パターンか試しましたがダメでした。
{
"eventName": ["INSERT"],
"dynamodb": {
"NewImage": {
"field_moji": { "S": ["test1"] },
"field_moji2": { "S": [{"wildcard": "test*"}] }
}
}
}
特定の文字が含まれる を実現したい場合は、Lambda関数のコード内で処理する必要があります。