- LocalStack上で動作するKinesisへのレコード登録をトリガーとしてLambdaを起動し、LambdaからDynamoDBへレコード登録を行う方法についてメモする。
構成
Kinesis準備
- ストリーム作成
aws kinesis create-stream --stream-name test-stream --shard-count 1 --endpoint-url http://localhost:4568 --profile localstack
- ストリーム確認
aws kinesis describe-stream --stream-name test-stream --endpoint-url http://localhost:4568 --profile localstack
{
"StreamDescription": {
"Shards": [
{
"ShardId": "shardId-000000000000",
"HashKeyRange": {
"StartingHashKey": "0",
"EndingHashKey": "340282366920938463463374607431768211455"
},
"SequenceNumberRange": {
"StartingSequenceNumber": "49623662862219041996029299625069687469926590353244160002"
}
}
],
"StreamARN": "arn:aws:kinesis:us-east-1:000000000000:stream/test-stream",
"StreamName": "test-stream",
"StreamStatus": "ACTIVE",
"RetentionPeriodHours": 24,
"EnhancedMonitoring": [
{
"ShardLevelMetrics": []
}
],
"EncryptionType": null,
"KeyId": null,
"StreamCreationTimestamp": null
}
}
DynamoDB準備
aws dynamodb create-table --table-name test-table --attribute-definitions AttributeName=Data,AttributeType=S --key-schema AttributeName=Data,KeyType=HASH --provisioned-throughput ReadCapacityUnits=1,WriteCapacityUnits=1 --endpoint-url=http://localhost:4569 --profile localstack
{
"TableDescription": {
"AttributeDefinitions": [
{
"AttributeName": "Data",
"AttributeType": "S"
}
],
"TableName": "test-table",
"KeySchema": [
{
"AttributeName": "Data",
"KeyType": "HASH"
}
],
"TableStatus": "ACTIVE",
"CreationDateTime": 1636171713.46,
"ProvisionedThroughput": {
"LastIncreaseDateTime": 0.0,
"LastDecreaseDateTime": 0.0,
"NumberOfDecreasesToday": 0,
"ReadCapacityUnits": 1,
"WriteCapacityUnits": 1
},
"TableSizeBytes": 0,
"ItemCount": 0,
"TableArn": "arn:aws:dynamodb:ddblocal:000000000000:table/test-table"
}
}
- 動作確認(データ登録)
aws dynamodb put-item --table-name test-table --item={\"Data\":{\"S\":\"fuga\"}} --endpoint-url=http://localhost:4569 --profile localstack
- 登録データ確認
aws dynamodb scan --table-name test-table --endpoint-url=http://localhost:4569 --profile localstack
{
"Items": [
{
"Data": {
"S": "fuga"
}
}
],
"Count": 1,
"ScannedCount": 1,
"ConsumedCapacity": null
}
Lambda準備
- Lambda処理(
app.py
)
※Kinesisから取得したデータをDynamoDBに登録する。
from __future__ import print_function
import base64
from boto3.session import Session
# DynamoDB 接続設定
session = Session(
aws_access_key_id='dummy',
aws_secret_access_key='dummy',
region_name='ap-northeast-1'
)
dynamodb = session.resource(
service_name='dynamodb',
endpoint_url='http://localhost:4569'
)
def lambda_handler(event, context):
table = dynamodb.Table('test-table')
for record in event['Records']:
payload = base64.b64decode(record['kinesis']['data'])
print('Decoded Payload:{}'.format(payload))
response = table.put_item(
Item={
'Data': payload
}
)
return "data put succeeded"
- アップロード
aws lambda create-function --function-name="test-kinesis-trigger" --runtime=python3.8 --role="arn:aws:iam::123456789012:role/service-role/lambda-sample-role" --handler=app.lambda_handler --zip-file fileb://app.zip --endpoint-url=http://localhost:4574 --profile=localstack
-
更新したい場合
aws lambda update-function-code --function-name="test-kinesis-trigger" --zip-file fileb://app.zip --endpoint-url=http://localhost:4574 --profile=localstack
- Kinesisをイベントトリガーに設定
aws lambda create-event-source-mapping --function-name test-kinesis-trigger \
--batch-size 500 \
--event-source-arn arn:aws:kinesis:us-east-1:000000000000:stream/test-stream --endpoint-url=http://localhost:4574 --profile=localstack
{
"UUID": "0ea9a7f7-4584-4d35-83c1-2390739aa442",
"StartingPosition": "LATEST",
"BatchSize": 100,
"EventSourceArn": "arn:aws:kinesis:us-east-1:000000000000:stream/test-stream",
"FunctionArn": "arn:aws:lambda:us-east-1:000000000000:function:test-kinesis-trigger",
"LastModified": 1636171993.0,
"LastProcessingResult": "OK",
"State": "Enabled",
"StateTransitionReason": "User action"
}
動作確認
- ストリームにレコードを入力
aws kinesis put-record --stream-name test-stream --partition-key 123 --data testdata --endpoint-url http://localhost:4568 --profile localstack
{
"ShardId": "shardId-000000000000",
"SequenceNumber": "49623662862219041996029299625069687469926616397925842946"
}
- DynamoDB確認
aws dynamodb scan --table-name test-table --endpoint-url=http://localhost:4569 --profile localstack
{
"Items": [
{
"Data": {
"S": "testdata"
}
},
{
"Data": {
"S": "fuga"
}
}
],
"Count": 2,
"ScannedCount": 2,
"ConsumedCapacity": null
}