17
10

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Quad incAdvent Calendar 2017

Day 8

DynamoDBストリームをLambdaで処理した

Last updated at Posted at 2017-12-07

概要

DynamoDBストリームを使って、DynamoDBに登録されたデータを順にLambdaで処理する方法について書きます。
使ってみて分かったのは、ストリームは順番に処理されるため、1つのテーブルのストリームをトリガーにLambdaを実行した場合、Lambdaが全然並列に起動しませんでした。そこでテーブルを複数作成して並列度を上げています。

DynamoDB側

テーブルの作成

テーブルを複数作成するので、シェルで aws dynamodb create-table を回します。

テーブルを作成
#!/bin/bash

function create_table () {
    MAX=$1

    for i in $(seq 0 ${MAX}); do
        # テーブルを作成する
        aws dynamodb create-table --table-name Image_${i} \
            --attribute-definitions AttributeName=asset_no,AttributeType=S \
            --key-schema AttributeName=asset_no,KeyType=HASH \
            --provisioned-throughput ReadCapacityUnits=10,WriteCapacityUnits=5 \
            --stream-specification StreamEnabled=true,StreamViewType=NEW_IMAGE
    done
}

create_table 19

ストリームを使いたいので StreamEnabled=true にしています。
上記を実行すると asset_no という属性のみを持った Image_0Image_19 という名前のテーブルが作成されます。

Lambda側

次にLambdaファンクションを作成します。

Lambdaに渡ってくるストリームの中身

まずはストリームから取得できる情報を見てみます。

ストリームの中身
{
        "Records": [
            {
                "eventID": "cc7c37c3e5fb3fe3604d2bf688d5cfec",
                "eventName": "MODIFY",
                "eventVersion": "1.1",
                "eventSource": "aws:dynamodb",
                "awsRegion": "ap-northeast-1",
                "dynamodb": {
                    "ApproximateCreationDateTime": 1507705860,
                    "Keys": {
                        "asset_no": {
                            "S": "AAAAA_110427_0904"
                        }
                    },
                    "NewImage": {
                        "asset_no": {
                            "S": "AAAAA_110427_0904"
                        },
                        "asset_url": {
                            "S": "https://test.test.jp/images/AAAAA_110427_0904"
                        }
                    },
                    "SequenceNumber": "260900000000006039058650",
                    "SizeBytes": 190,
                    "StreamViewType": "NEW_IMAGE"
                },
                "eventSourceARN": "arn:aws:dynamodb:ap-northeast-1:XXXXXXXXXXXXXXXXX:table/Image/stream/2017-10-11T03:32:43.158"
            }
        ]
    }

Records[].dynamodb の下にデータが入っていますね。
データには作成時に設定したキー以外にも属性(asset_url)を設定しています。

Pythonでの取得方法

先ほどのストリームデータは lambda_handler に第1引数(event)で渡されるので、以下のように取得可能です。

ストリームの中身を取得
def lambda_handler(event, context):

    image = event['Records'][0]['dynamodb']['NewImage']

Dictに変換

扱いやすくするためにDictionaryに変換します。

dictに変換
from boto3.dynamodb.types import TypeDeserializer

deserializer = TypeDeserializer()

:

def deserialize(image):
    """
    dictに変換する
    """
    d = {}
    for key in image:
        d[key] = deserializer.deserialize(image[key])
    return d

まとめると以下のようになります。

Lambdaファンクション
# -*- coding: utf_8 -*-

from boto3.dynamodb.types import TypeDeserializer

deserializer = TypeDeserializer()


def lambda_handler(event, context):
    # 削除の場合は処理しない
    if event['Records'][0]['eventName'] == 'REMOVE':
        return

    image = event['Records'][0]['dynamodb']['NewImage']

    try:
        # eventをdictに変換
        item = deserialize(image)

        # 処理

    except Exception as e:
        raise e
    else:
        return {"result": "OK"}


def deserialize(image):
    """
    dictに変換する
    """
    d = {}
    for key in image:
        d[key] = deserializer.deserialize(image[key])
    return d

トリガーの作成

最後にLambdaファンクションのトリガー設定を行います。
こちらもシェルで処理しました。

トリガーの設定
#!/bin/bash

function create_trigger () {
    MAX=$1

    for i in $(seq 0 ${MAX}); do
        # ストリームのARNを取得する
        stream=`aws dynamodb describe-table --table-name Image_${i} | jq -r .Table.LatestStreamArn`

        sleep 1s

        # トリガーを作成する
        aws lambda create-event-source-mapping \
            --event-source-arn "${stream}" \
            --function-name image_downloader \
            --enabled \
            --batch-size 1 \
            --starting-position TRIM_HORIZON
    done
}

create_trigger 19

以上で、DynamoDBにデータが作成されると、ストリームをトリガーとしてLambdaが起動し処理されます。

17
10
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
17
10

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?