概要
DynamoDB Local を使って、開発環境のローカルマシンで DynamoDB を動かすことができるが、DynamoDB Stream を使いたい場合はポーリングする仕組みが必要。その方法のメモ。
Stream を監視する Python のソース
app-stream.py
import boto3
import os
import time
from pprint import pprint
# 対象のテーブル
table_names = [
'XXX',
'YYY',
]
# 初期化
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodbstreams.html
boto3.setup_default_session(
aws_access_key_id='dummy', # ローカルといえども何かを送る必要あるので値は適当
aws_secret_access_key='dummy', # 同上
region_name='ap-northeast-1',
)
endpoint_url = os.getenv('DYNAMODB_ENDPOINT', 'http://localhost:8001')
cli_dynamo = boto3.client('dynamodb', endpoint_url=endpoint_url)
cli_streams = boto3.client('dynamodbstreams', endpoint_url=endpoint_url)
# シャードイテレーター管理用のdict生成
shard_iters = {}
for table_name in table_names:
# 最新のDynamoDB Streams ARN取得
stream_arn = cli_dynamo.describe_table(TableName=table_name)['Table']['LatestStreamArn']
print(f"stream arn: {stream_arn}")
# 取得したARNからストリームに関して取得
shards = cli_streams.describe_stream(StreamArn=stream_arn)['StreamDescription']['Shards']
# 具体的なシャードイテレーター取得
for shard in shards:
shard_id = shard['ShardId']
shard_iters[shard_id] = cli_streams.get_shard_iterator(
StreamArn=stream_arn,
ShardId=shard_id,
ShardIteratorType='LATEST',
)['ShardIterator']
def lambda_handler(event, context):
# 仮。ここでやりたい処理を書く
pprint(event)
# ストリームを無限ループで読む
while True:
print(f"shard_iters size: {len(shard_iters)}")
# 開いているものがなかったら終了する
if len(shard_iters) == 0:
break
# イテレーターごとに更新がないか読む
for shard_id, cur_iter in [*shard_iters.items()]:
# イテレーターが閉じられているのを確認
if cur_iter is None:
print(f"closed shard: {shard_id}")
del shard_iters[shard_id] # 現在のイテレーターをテーブルから削除
continue
records = cli_streams.get_records(ShardIterator=cur_iter)
# イテレーターを更新
# 上で処理しているが `NextShardIterator` が None の場合はそのイテレーターは閉じられた
next_iter = records.get('NextShardIterator')
# 取得した次のイテレーターに変更がないならスキップ
if next_iter == cur_iter:
continue
shard_iters[shard_id] = next_iter
# レコードが取得できなかった場合はスキップ
if len(records['Records']) == 0:
continue
print(f"Lambda event data count: {len(records['Records'])}")
# AWS Lambda 呼び出し
lambda_handler(event=records, context=None)
# スリープを入れる
time.sleep(3)
テーブル作成~データ投入
import boto3
from botocore import exceptions
import random
import time
from pprint import pprint
# 対象のテーブル
table_names = [
'Xxx',
'Yyy',
]
# 初期化
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html
boto3.setup_default_session(
aws_access_key_id='dummy', # ローカルといえども何かを送る必要あるので値は適当
aws_secret_access_key='dummy', # 同上
region_name='ap-northeast-1',
)
endpoint_url = 'http://localhost:8001'
cli_dynamo = boto3.client('dynamodb', endpoint_url=endpoint_url)
# テスト用にテーブル作成
for table_name in table_names:
# noinspection PyBroadException
try:
res = cli_dynamo.delete_table(
TableName=table_name,
)
print(f"[delete_table] table_name={table_name}")
pprint(res)
print("")
except Exception:
pass
res = cli_dynamo.create_table(
TableName=table_name,
AttributeDefinitions=[
{"AttributeName": "fPk", "AttributeType": "S"},
{"AttributeName": "fSk", "AttributeType": "S"}
],
KeySchema=[
{"AttributeName": "fPk", "KeyType": "HASH"},
{"AttributeName": "fSk", "KeyType": "RANGE"}
],
ProvisionedThroughput={"ReadCapacityUnits": 1, "WriteCapacityUnits": 1},
StreamSpecification={"StreamEnabled": True, "StreamViewType": "NEW_AND_OLD_IMAGES"}
)
print(f"[create_table] table_name={table_name}")
pprint(res)
print("")
# 無限ループで定期投入
while True:
# テスト用にデータ投入
table_name = random.choice(table_names)
item = {
"fPk": {"S": f"{table_name}-1"},
"fSk": {"S": f"{random.randint(1, 10)}"},
"val1": {"S": f"{table_name}-{random.randint(1, 10)}"},
}
res = cli_dynamo.put_item(
TableName=table_name,
Item=item,
)
print(f"[put_item] table_name={table_name}, item={item}")
print(res)
print("")
# スリープを入れる
time.sleep(2)
docker-compose
dockerfile-python/Dockerfile
FROM public.ecr.aws/lambda/python:3.9
RUN pip install boto3
ENTRYPOINT ["python"]
CMD ["/src/app.py"]
docker-compose.yml
version: '3.8'
services:
# DymanoDB
dynamodb:
command: "-jar DynamoDBLocal.jar -sharedDb -port 8001 -inMemory"
image: "amazon/dynamodb-local:latest"
ports:
- "8001:8001"
working_dir: /home/dynamodblocal
# DynamoDB に定期的にデータ投入
insert:
depends_on: ["dynamodb"]
build: ./dockerfile-python
environment:
- DYNAMODB_ENDPOINT=http://dynamodb:8001
volumes:
- ../backend-py/src/app-insert.py:/src/app.py
# DynamoDB Stream を監視
stream:
depends_on: ["dynamodb"]
build: ./dockerfile-python
environment:
- DYNAMODB_ENDPOINT=http://dynamodb:8001
volumes:
- ../backend-py/src/app-stream.py:/src/app.py
実行
docker-compose build
docker-compose up dynamodb
docker-compose run --rm insert
-
docker-compose run --rm stream
(別の窓で)