概要
DynamoDBストリームを使って、DynamoDBに登録されたデータを順にLambdaで処理する方法について書きます。
使ってみて分かったのは、ストリームは順番に処理されるため、1つのテーブルのストリームをトリガーにLambdaを実行した場合、Lambdaが全然並列に起動しませんでした。そこでテーブルを複数作成して並列度を上げています。
DynamoDB側
テーブルの作成
テーブルを複数作成するので、シェルで aws dynamodb create-table
を回します。
#!/bin/bash
function create_table () {
MAX=$1
for i in $(seq 0 ${MAX}); do
# テーブルを作成する
aws dynamodb create-table --table-name Image_${i} \
--attribute-definitions AttributeName=asset_no,AttributeType=S \
--key-schema AttributeName=asset_no,KeyType=HASH \
--provisioned-throughput ReadCapacityUnits=10,WriteCapacityUnits=5 \
--stream-specification StreamEnabled=true,StreamViewType=NEW_IMAGE
done
}
create_table 19
ストリームを使いたいので StreamEnabled=true
にしています。
上記を実行すると asset_no
という属性のみを持った Image_0
〜 Image_19
という名前のテーブルが作成されます。
Lambda側
次にLambdaファンクションを作成します。
Lambdaに渡ってくるストリームの中身
まずはストリームから取得できる情報を見てみます。
{
"Records": [
{
"eventID": "cc7c37c3e5fb3fe3604d2bf688d5cfec",
"eventName": "MODIFY",
"eventVersion": "1.1",
"eventSource": "aws:dynamodb",
"awsRegion": "ap-northeast-1",
"dynamodb": {
"ApproximateCreationDateTime": 1507705860,
"Keys": {
"asset_no": {
"S": "AAAAA_110427_0904"
}
},
"NewImage": {
"asset_no": {
"S": "AAAAA_110427_0904"
},
"asset_url": {
"S": "https://test.test.jp/images/AAAAA_110427_0904"
}
},
"SequenceNumber": "260900000000006039058650",
"SizeBytes": 190,
"StreamViewType": "NEW_IMAGE"
},
"eventSourceARN": "arn:aws:dynamodb:ap-northeast-1:XXXXXXXXXXXXXXXXX:table/Image/stream/2017-10-11T03:32:43.158"
}
]
}
Records[].dynamodb
の下にデータが入っていますね。
データには作成時に設定したキー以外にも属性(asset_url)を設定しています。
Pythonでの取得方法
先ほどのストリームデータは lambda_handler に第1引数(event)で渡されるので、以下のように取得可能です。
def lambda_handler(event, context):
image = event['Records'][0]['dynamodb']['NewImage']
Dictに変換
扱いやすくするためにDictionaryに変換します。
from boto3.dynamodb.types import TypeDeserializer
deserializer = TypeDeserializer()
:
def deserialize(image):
"""
dictに変換する
"""
d = {}
for key in image:
d[key] = deserializer.deserialize(image[key])
return d
まとめると以下のようになります。
# -*- coding: utf_8 -*-
from boto3.dynamodb.types import TypeDeserializer
deserializer = TypeDeserializer()
def lambda_handler(event, context):
# 削除の場合は処理しない
if event['Records'][0]['eventName'] == 'REMOVE':
return
image = event['Records'][0]['dynamodb']['NewImage']
try:
# eventをdictに変換
item = deserialize(image)
# 処理
except Exception as e:
raise e
else:
return {"result": "OK"}
def deserialize(image):
"""
dictに変換する
"""
d = {}
for key in image:
d[key] = deserializer.deserialize(image[key])
return d
トリガーの作成
最後にLambdaファンクションのトリガー設定を行います。
こちらもシェルで処理しました。
#!/bin/bash
function create_trigger () {
MAX=$1
for i in $(seq 0 ${MAX}); do
# ストリームのARNを取得する
stream=`aws dynamodb describe-table --table-name Image_${i} | jq -r .Table.LatestStreamArn`
sleep 1s
# トリガーを作成する
aws lambda create-event-source-mapping \
--event-source-arn "${stream}" \
--function-name image_downloader \
--enabled \
--batch-size 1 \
--starting-position TRIM_HORIZON
done
}
create_trigger 19
以上で、DynamoDBにデータが作成されると、ストリームをトリガーとしてLambdaが起動し処理されます。