LoginSignup
0
0

More than 1 year has passed since last update.

DynamoDB Local で stream を動かす方法 (Python)

Posted at

概要

DynamoDB Local を使って、開発環境のローカルマシンで DynamoDB を動かすことができるが、DynamoDB Stream を使いたい場合はポーリングする仕組みが必要。その方法のメモ。

Stream を監視する Python のソース

app-stream.py
import boto3
import os
import time
from pprint import pprint

# 対象のテーブル
table_names = [
    'XXX',
    'YYY',
]

# 初期化
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodbstreams.html
boto3.setup_default_session(
    aws_access_key_id='dummy',  # ローカルといえども何かを送る必要あるので値は適当
    aws_secret_access_key='dummy',  # 同上
    region_name='ap-northeast-1',
)
endpoint_url = os.getenv('DYNAMODB_ENDPOINT', 'http://localhost:8001')
cli_dynamo = boto3.client('dynamodb', endpoint_url=endpoint_url)
cli_streams = boto3.client('dynamodbstreams', endpoint_url=endpoint_url)

# シャードイテレーター管理用のdict生成
shard_iters = {}
for table_name in table_names:
    # 最新のDynamoDB Streams ARN取得
    stream_arn = cli_dynamo.describe_table(TableName=table_name)['Table']['LatestStreamArn']
    print(f"stream arn: {stream_arn}")

    # 取得したARNからストリームに関して取得
    shards = cli_streams.describe_stream(StreamArn=stream_arn)['StreamDescription']['Shards']

    # 具体的なシャードイテレーター取得
    for shard in shards:
        shard_id = shard['ShardId']
        shard_iters[shard_id] = cli_streams.get_shard_iterator(
            StreamArn=stream_arn,
            ShardId=shard_id,
            ShardIteratorType='LATEST',
        )['ShardIterator']


def lambda_handler(event, context):
    # 仮。ここでやりたい処理を書く
    pprint(event)


# ストリームを無限ループで読む
while True:
    print(f"shard_iters size: {len(shard_iters)}")
    # 開いているものがなかったら終了する
    if len(shard_iters) == 0:
        break

    # イテレーターごとに更新がないか読む
    for shard_id, cur_iter in [*shard_iters.items()]:

        # イテレーターが閉じられているのを確認
        if cur_iter is None:
            print(f"closed shard: {shard_id}")
            del shard_iters[shard_id]  # 現在のイテレーターをテーブルから削除
            continue

        records = cli_streams.get_records(ShardIterator=cur_iter)
        # イテレーターを更新
        # 上で処理しているが `NextShardIterator` が None の場合はそのイテレーターは閉じられた
        next_iter = records.get('NextShardIterator')

        # 取得した次のイテレーターに変更がないならスキップ
        if next_iter == cur_iter:
            continue

        shard_iters[shard_id] = next_iter

        # レコードが取得できなかった場合はスキップ
        if len(records['Records']) == 0:
            continue

        print(f"Lambda event data count: {len(records['Records'])}")

        # AWS Lambda 呼び出し
        lambda_handler(event=records, context=None)

    # スリープを入れる
    time.sleep(3)

テーブル作成~データ投入

import boto3
from botocore import exceptions
import random
import time
from pprint import pprint

# 対象のテーブル
table_names = [
    'Xxx',
    'Yyy',
]

# 初期化
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html
boto3.setup_default_session(
    aws_access_key_id='dummy',  # ローカルといえども何かを送る必要あるので値は適当
    aws_secret_access_key='dummy',  # 同上
    region_name='ap-northeast-1',
)
endpoint_url = 'http://localhost:8001'
cli_dynamo = boto3.client('dynamodb', endpoint_url=endpoint_url)

# テスト用にテーブル作成
for table_name in table_names:
    # noinspection PyBroadException
    try:
        res = cli_dynamo.delete_table(
            TableName=table_name,
        )
        print(f"[delete_table] table_name={table_name}")
        pprint(res)
        print("")
    except Exception:
        pass

    res = cli_dynamo.create_table(
        TableName=table_name,
        AttributeDefinitions=[
            {"AttributeName": "fPk", "AttributeType": "S"},
            {"AttributeName": "fSk", "AttributeType": "S"}
        ],
        KeySchema=[
            {"AttributeName": "fPk", "KeyType": "HASH"},
            {"AttributeName": "fSk", "KeyType": "RANGE"}
        ],
        ProvisionedThroughput={"ReadCapacityUnits": 1, "WriteCapacityUnits": 1},
        StreamSpecification={"StreamEnabled": True, "StreamViewType": "NEW_AND_OLD_IMAGES"}
    )
    print(f"[create_table] table_name={table_name}")
    pprint(res)
    print("")

# 無限ループで定期投入
while True:
    # テスト用にデータ投入
    table_name = random.choice(table_names)
    item = {
        "fPk": {"S": f"{table_name}-1"},
        "fSk": {"S": f"{random.randint(1, 10)}"},
        "val1": {"S": f"{table_name}-{random.randint(1, 10)}"},
    }
    res = cli_dynamo.put_item(
        TableName=table_name,
        Item=item,
    )
    print(f"[put_item] table_name={table_name}, item={item}")
    print(res)
    print("")

    # スリープを入れる
    time.sleep(2)

docker-compose

dockerfile-python/Dockerfile
FROM public.ecr.aws/lambda/python:3.9
RUN pip install boto3
ENTRYPOINT ["python"]
CMD ["/src/app.py"]
docker-compose.yml
version: '3.8'
services:
  # DymanoDB
  dynamodb:
    command: "-jar DynamoDBLocal.jar -sharedDb -port 8001 -inMemory"
    image: "amazon/dynamodb-local:latest"
    ports:
      - "8001:8001"
    working_dir: /home/dynamodblocal
  # DynamoDB に定期的にデータ投入
  insert:
    depends_on: ["dynamodb"]
    build: ./dockerfile-python
    environment:
      - DYNAMODB_ENDPOINT=http://dynamodb:8001
    volumes:
      - ../backend-py/src/app-insert.py:/src/app.py
  # DynamoDB Stream を監視
  stream:
    depends_on: ["dynamodb"]
    build: ./dockerfile-python
    environment:
      - DYNAMODB_ENDPOINT=http://dynamodb:8001
    volumes:
      - ../backend-py/src/app-stream.py:/src/app.py

実行

  1. docker-compose build
  2. docker-compose up dynamodb
  3. docker-compose run --rm insert
  4. docker-compose run --rm stream (別の窓で)
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0