0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

LocalStackを用いたKinesisをトリガーとしたLambda実行

Last updated at Posted at 2022-01-23
  • LocalStack上で動作するKinesisへのレコード登録をトリガーとしてLambdaを起動し、LambdaからDynamoDBへレコード登録を行う方法についてメモする。

構成

kinesis-trigger.png

Kinesis準備

  • ストリーム作成
aws kinesis create-stream --stream-name test-stream --shard-count 1 --endpoint-url http://localhost:4568 --profile localstack 
  • ストリーム確認
aws kinesis describe-stream --stream-name test-stream --endpoint-url http://localhost:4568 --profile localstack
{
    "StreamDescription": {
        "Shards": [
            {
                "ShardId": "shardId-000000000000",
                "HashKeyRange": {
                    "StartingHashKey": "0",
                    "EndingHashKey": "340282366920938463463374607431768211455"
                },
                "SequenceNumberRange": {
                    "StartingSequenceNumber": "49623662862219041996029299625069687469926590353244160002"
                }
            }
        ],
        "StreamARN": "arn:aws:kinesis:us-east-1:000000000000:stream/test-stream",
        "StreamName": "test-stream",
        "StreamStatus": "ACTIVE",
        "RetentionPeriodHours": 24,
        "EnhancedMonitoring": [
            {
                "ShardLevelMetrics": []
            }
        ],
        "EncryptionType": null,
        "KeyId": null,
        "StreamCreationTimestamp": null
    }
}

DynamoDB準備

aws dynamodb create-table --table-name test-table --attribute-definitions AttributeName=Data,AttributeType=S --key-schema AttributeName=Data,KeyType=HASH --provisioned-throughput ReadCapacityUnits=1,WriteCapacityUnits=1 --endpoint-url=http://localhost:4569 --profile localstack
{
    "TableDescription": {
        "AttributeDefinitions": [
            {
                "AttributeName": "Data",
                "AttributeType": "S"
            }
        ],
        "TableName": "test-table",
        "KeySchema": [
            {
                "AttributeName": "Data",
                "KeyType": "HASH"
            }
        ],
        "TableStatus": "ACTIVE",
        "CreationDateTime": 1636171713.46,
        "ProvisionedThroughput": {
            "LastIncreaseDateTime": 0.0,
            "LastDecreaseDateTime": 0.0,
            "NumberOfDecreasesToday": 0,
            "ReadCapacityUnits": 1,
            "WriteCapacityUnits": 1
        },
        "TableSizeBytes": 0,
        "ItemCount": 0,
        "TableArn": "arn:aws:dynamodb:ddblocal:000000000000:table/test-table"
    }
}
  • 動作確認(データ登録)
aws dynamodb put-item --table-name test-table --item={\"Data\":{\"S\":\"fuga\"}}  --endpoint-url=http://localhost:4569 --profile localstack
  • 登録データ確認

    aws dynamodb scan --table-name test-table --endpoint-url=http://localhost:4569 --profile localstack
    {
        "Items": [
            {
                "Data": {
                    "S": "fuga"
                }
            }
        ],
        "Count": 1,
        "ScannedCount": 1,
        "ConsumedCapacity": null
    }
    

Lambda準備

  • Lambda処理(app.py)

    ※Kinesisから取得したデータをDynamoDBに登録する。

    from __future__ import print_function
    import base64
    from boto3.session import Session
    
    # DynamoDB 接続設定
    session = Session(
        aws_access_key_id='dummy',
        aws_secret_access_key='dummy',
        region_name='ap-northeast-1'
    )
    
    dynamodb = session.resource(
        service_name='dynamodb', 
        endpoint_url='http://localhost:4569'
    )
    
    def lambda_handler(event, context):
    
        table = dynamodb.Table('test-table')
        for record in event['Records']:
            payload = base64.b64decode(record['kinesis']['data'])
            print('Decoded Payload:{}'.format(payload))
            response = table.put_item(
                    Item={
                        'Data': payload
                    }
                )
        return "data put succeeded"
    
  • アップロード

    aws lambda create-function --function-name="test-kinesis-trigger" --runtime=python3.8 --role="arn:aws:iam::123456789012:role/service-role/lambda-sample-role" --handler=app.lambda_handler --zip-file fileb://app.zip --endpoint-url=http://localhost:4574 --profile=localstack
    
    • 更新したい場合

      aws lambda update-function-code --function-name="test-kinesis-trigger" --zip-file fileb://app.zip --endpoint-url=http://localhost:4574 --profile=localstack
      
  • Kinesisをイベントトリガーに設定

aws lambda create-event-source-mapping --function-name test-kinesis-trigger \
--batch-size 500 \
--event-source-arn arn:aws:kinesis:us-east-1:000000000000:stream/test-stream --endpoint-url=http://localhost:4574 --profile=localstack
{
    "UUID": "0ea9a7f7-4584-4d35-83c1-2390739aa442",
    "StartingPosition": "LATEST",
    "BatchSize": 100,
    "EventSourceArn": "arn:aws:kinesis:us-east-1:000000000000:stream/test-stream",
    "FunctionArn": "arn:aws:lambda:us-east-1:000000000000:function:test-kinesis-trigger",
    "LastModified": 1636171993.0,
    "LastProcessingResult": "OK",
    "State": "Enabled",
    "StateTransitionReason": "User action"
}

動作確認

  • ストリームにレコードを入力

    aws kinesis put-record --stream-name test-stream --partition-key 123 --data testdata --endpoint-url http://localhost:4568 --profile localstack
    {
        "ShardId": "shardId-000000000000",
        "SequenceNumber": "49623662862219041996029299625069687469926616397925842946"
    }
    
  • DynamoDB確認

    aws dynamodb scan --table-name test-table --endpoint-url=http://localhost:4569 --profile localstack
    {
        "Items": [
            {
                "Data": {
                    "S": "testdata"
                }
            },
            {
                "Data": {
                    "S": "fuga"
                }
            }
        ],
        "Count": 2,
        "ScannedCount": 2,
        "ConsumedCapacity": null
    }
    

参考情報

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?