python3
DynamoDB
lambda

DynamoDBストリームをLambdaで処理した

概要

DynamoDBストリームを使って、DynamoDBに登録されたデータを順にLambdaで処理する方法について書きます。
使ってみて分かったのは、ストリームは順番に処理されるため、1つのテーブルのストリームをトリガーにLambdaを実行した場合、Lambdaが全然並列に起動しませんでした。そこでテーブルを複数作成して並列度を上げています。

DynamoDB側

テーブルの作成

テーブルを複数作成するので、シェルで aws dynamodb create-table を回します。

テーブルを作成
#!/bin/bash

function create_table () {
    MAX=$1

    for i in $(seq 0 ${MAX}); do
        # テーブルを作成する
        aws dynamodb create-table --table-name Image_${i} \
            --attribute-definitions AttributeName=asset_no,AttributeType=S \
            --key-schema AttributeName=asset_no,KeyType=HASH \
            --provisioned-throughput ReadCapacityUnits=10,WriteCapacityUnits=5 \
            --stream-specification StreamEnabled=true,StreamViewType=NEW_IMAGE
    done
}

create_table 19

ストリームを使いたいので StreamEnabled=true にしています。
上記を実行すると asset_no という属性のみを持った Image_0Image_19 という名前のテーブルが作成されます。

Lambda側

次にLambdaファンクションを作成します。

Lambdaに渡ってくるストリームの中身

まずはストリームから取得できる情報を見てみます。

ストリームの中身
{
        "Records": [
            {
                "eventID": "cc7c37c3e5fb3fe3604d2bf688d5cfec",
                "eventName": "MODIFY",
                "eventVersion": "1.1",
                "eventSource": "aws:dynamodb",
                "awsRegion": "ap-northeast-1",
                "dynamodb": {
                    "ApproximateCreationDateTime": 1507705860,
                    "Keys": {
                        "asset_no": {
                            "S": "AAAAA_110427_0904"
                        }
                    },
                    "NewImage": {
                        "asset_no": {
                            "S": "AAAAA_110427_0904"
                        },
                        "asset_url": {
                            "S": "https://test.test.jp/images/AAAAA_110427_0904"
                        }
                    },
                    "SequenceNumber": "260900000000006039058650",
                    "SizeBytes": 190,
                    "StreamViewType": "NEW_IMAGE"
                },
                "eventSourceARN": "arn:aws:dynamodb:ap-northeast-1:XXXXXXXXXXXXXXXXX:table/Image/stream/2017-10-11T03:32:43.158"
            }
        ]
    }

Records[].dynamodb の下にデータが入っていますね。
データには作成時に設定したキー以外にも属性(asset_url)を設定しています。

Pythonでの取得方法

先ほどのストリームデータは lambda_handler に第1引数(event)で渡されるので、以下のように取得可能です。

ストリームの中身を取得
def lambda_handler(event, context):

    image = event['Records'][0]['dynamodb']['NewImage']

Dictに変換

扱いやすくするためにDictionaryに変換します。

dictに変換
from boto3.dynamodb.types import TypeDeserializer

deserializer = TypeDeserializer()

:

def deserialize(image):
    """
    dictに変換する
    """
    d = {}
    for key in image:
        d[key] = deserializer.deserialize(image[key])
    return d

まとめると以下のようになります。

Lambdaファンクション
# -*- coding: utf_8 -*-

from boto3.dynamodb.types import TypeDeserializer

deserializer = TypeDeserializer()


def lambda_handler(event, context):
    # 削除の場合は処理しない
    if event['Records'][0]['eventName'] == 'REMOVE':
        return

    image = event['Records'][0]['dynamodb']['NewImage']

    try:
        # eventをdictに変換
        item = deserialize(image)

        # 処理

    except Exception as e:
        raise e
    else:
        return {"result": "OK"}


def deserialize(image):
    """
    dictに変換する
    """
    d = {}
    for key in image:
        d[key] = deserializer.deserialize(image[key])
    return d

トリガーの作成

最後にLambdaファンクションのトリガー設定を行います。
こちらもシェルで処理しました。

トリガーの設定
#!/bin/bash

function create_trigger () {
    MAX=$1

    for i in $(seq 0 ${MAX}); do
        # ストリームのARNを取得する
        stream=`aws dynamodb describe-table --table-name Image_${i} | jq -r .Table.LatestStreamArn`

        sleep 1s

        # トリガーを作成する
        aws lambda create-event-source-mapping \
            --event-source-arn "${stream}" \
            --function-name image_downloader \
            --enabled \
            --batch-size 1 \
            --starting-position TRIM_HORIZON
    done
}

create_trigger 19

以上で、DynamoDBにデータが作成されると、ストリームをトリガーとしてLambdaが起動し処理されます。