- LocalStack上で動作するKinesisへのレコード登録をトリガーとしてLambdaを起動し、LambdaからDynamoDBへレコード登録を行う方法についてメモする。
構成
Kinesis準備
- ストリーム作成
aws kinesis create-stream --stream-name test-stream --shard-count 1 --endpoint-url http://localhost:4568 --profile localstack
- ストリーム確認
aws kinesis describe-stream --stream-name test-stream --endpoint-url http://localhost:4568 --profile localstack
{
"StreamDescription": {
"Shards": [
{
"ShardId": "shardId-000000000000",
"HashKeyRange": {
"StartingHashKey": "0",
"EndingHashKey": "340282366920938463463374607431768211455"
},
"SequenceNumberRange": {
"StartingSequenceNumber": "49623662862219041996029299625069687469926590353244160002"
}
}
],
"StreamARN": "arn:aws:kinesis:us-east-1:000000000000:stream/test-stream",
"StreamName": "test-stream",
"StreamStatus": "ACTIVE",
"RetentionPeriodHours": 24,
"EnhancedMonitoring": [
{
"ShardLevelMetrics": []
}
],
"EncryptionType": null,
"KeyId": null,
"StreamCreationTimestamp": null
}
}
DynamoDB準備
aws dynamodb create-table --table-name test-table --attribute-definitions AttributeName=Data,AttributeType=S --key-schema AttributeName=Data,KeyType=HASH --provisioned-throughput ReadCapacityUnits=1,WriteCapacityUnits=1 --endpoint-url=http://localhost:4569 --profile localstack
{
"TableDescription": {
"AttributeDefinitions": [
{
"AttributeName": "Data",
"AttributeType": "S"
}
],
"TableName": "test-table",
"KeySchema": [
{
"AttributeName": "Data",
"KeyType": "HASH"
}
],
"TableStatus": "ACTIVE",
"CreationDateTime": 1636171713.46,
"ProvisionedThroughput": {
"LastIncreaseDateTime": 0.0,
"LastDecreaseDateTime": 0.0,
"NumberOfDecreasesToday": 0,
"ReadCapacityUnits": 1,
"WriteCapacityUnits": 1
},
"TableSizeBytes": 0,
"ItemCount": 0,
"TableArn": "arn:aws:dynamodb:ddblocal:000000000000:table/test-table"
}
}
- 動作確認(データ登録)
aws dynamodb put-item --table-name test-table --item={\"Data\":{\"S\":\"fuga\"}} --endpoint-url=http://localhost:4569 --profile localstack
-
登録データ確認
aws dynamodb scan --table-name test-table --endpoint-url=http://localhost:4569 --profile localstack { "Items": [ { "Data": { "S": "fuga" } } ], "Count": 1, "ScannedCount": 1, "ConsumedCapacity": null }
Lambda準備
-
Lambda処理(
app.py)※Kinesisから取得したデータをDynamoDBに登録する。
from __future__ import print_function import base64 from boto3.session import Session # DynamoDB 接続設定 session = Session( aws_access_key_id='dummy', aws_secret_access_key='dummy', region_name='ap-northeast-1' ) dynamodb = session.resource( service_name='dynamodb', endpoint_url='http://localhost:4569' ) def lambda_handler(event, context): table = dynamodb.Table('test-table') for record in event['Records']: payload = base64.b64decode(record['kinesis']['data']) print('Decoded Payload:{}'.format(payload)) response = table.put_item( Item={ 'Data': payload } ) return "data put succeeded" -
アップロード
aws lambda create-function --function-name="test-kinesis-trigger" --runtime=python3.8 --role="arn:aws:iam::123456789012:role/service-role/lambda-sample-role" --handler=app.lambda_handler --zip-file fileb://app.zip --endpoint-url=http://localhost:4574 --profile=localstack-
更新したい場合
aws lambda update-function-code --function-name="test-kinesis-trigger" --zip-file fileb://app.zip --endpoint-url=http://localhost:4574 --profile=localstack
-
-
Kinesisをイベントトリガーに設定
aws lambda create-event-source-mapping --function-name test-kinesis-trigger \
--batch-size 500 \
--event-source-arn arn:aws:kinesis:us-east-1:000000000000:stream/test-stream --endpoint-url=http://localhost:4574 --profile=localstack
{
"UUID": "0ea9a7f7-4584-4d35-83c1-2390739aa442",
"StartingPosition": "LATEST",
"BatchSize": 100,
"EventSourceArn": "arn:aws:kinesis:us-east-1:000000000000:stream/test-stream",
"FunctionArn": "arn:aws:lambda:us-east-1:000000000000:function:test-kinesis-trigger",
"LastModified": 1636171993.0,
"LastProcessingResult": "OK",
"State": "Enabled",
"StateTransitionReason": "User action"
}
動作確認
-
ストリームにレコードを入力
aws kinesis put-record --stream-name test-stream --partition-key 123 --data testdata --endpoint-url http://localhost:4568 --profile localstack { "ShardId": "shardId-000000000000", "SequenceNumber": "49623662862219041996029299625069687469926616397925842946" } -
DynamoDB確認
aws dynamodb scan --table-name test-table --endpoint-url=http://localhost:4569 --profile localstack { "Items": [ { "Data": { "S": "testdata" } }, { "Data": { "S": "fuga" } } ], "Count": 2, "ScannedCount": 2, "ConsumedCapacity": null }
