0
0

More than 1 year has passed since last update.

LocalStackを用いたKinesisをトリガーとしたLambda実行

Last updated at Posted at 2022-01-23
  • LocalStack上で動作するKinesisへのレコード登録をトリガーとしてLambdaを起動し、LambdaからDynamoDBへレコード登録を行う方法についてメモする。

構成

kinesis-trigger.png

Kinesis準備

  • ストリーム作成
aws kinesis create-stream --stream-name test-stream --shard-count 1 --endpoint-url http://localhost:4568 --profile localstack 
  • ストリーム確認
aws kinesis describe-stream --stream-name test-stream --endpoint-url http://localhost:4568 --profile localstack
{
    "StreamDescription": {
        "Shards": [
            {
                "ShardId": "shardId-000000000000",
                "HashKeyRange": {
                    "StartingHashKey": "0",
                    "EndingHashKey": "340282366920938463463374607431768211455"
                },
                "SequenceNumberRange": {
                    "StartingSequenceNumber": "49623662862219041996029299625069687469926590353244160002"
                }
            }
        ],
        "StreamARN": "arn:aws:kinesis:us-east-1:000000000000:stream/test-stream",
        "StreamName": "test-stream",
        "StreamStatus": "ACTIVE",
        "RetentionPeriodHours": 24,
        "EnhancedMonitoring": [
            {
                "ShardLevelMetrics": []
            }
        ],
        "EncryptionType": null,
        "KeyId": null,
        "StreamCreationTimestamp": null
    }
}

DynamoDB準備

aws dynamodb create-table --table-name test-table --attribute-definitions AttributeName=Data,AttributeType=S --key-schema AttributeName=Data,KeyType=HASH --provisioned-throughput ReadCapacityUnits=1,WriteCapacityUnits=1 --endpoint-url=http://localhost:4569 --profile localstack
{
    "TableDescription": {
        "AttributeDefinitions": [
            {
                "AttributeName": "Data",
                "AttributeType": "S"
            }
        ],
        "TableName": "test-table",
        "KeySchema": [
            {
                "AttributeName": "Data",
                "KeyType": "HASH"
            }
        ],
        "TableStatus": "ACTIVE",
        "CreationDateTime": 1636171713.46,
        "ProvisionedThroughput": {
            "LastIncreaseDateTime": 0.0,
            "LastDecreaseDateTime": 0.0,
            "NumberOfDecreasesToday": 0,
            "ReadCapacityUnits": 1,
            "WriteCapacityUnits": 1
        },
        "TableSizeBytes": 0,
        "ItemCount": 0,
        "TableArn": "arn:aws:dynamodb:ddblocal:000000000000:table/test-table"
    }
}
  • 動作確認(データ登録)
aws dynamodb put-item --table-name test-table --item={\"Data\":{\"S\":\"fuga\"}}  --endpoint-url=http://localhost:4569 --profile localstack
  • 登録データ確認
  aws dynamodb scan --table-name test-table --endpoint-url=http://localhost:4569 --profile localstack
  {
      "Items": [
          {
              "Data": {
                  "S": "fuga"
              }
          }
      ],
      "Count": 1,
      "ScannedCount": 1,
      "ConsumedCapacity": null
  }

Lambda準備

  • Lambda処理(app.py)

※Kinesisから取得したデータをDynamoDBに登録する。

  from __future__ import print_function
  import base64
  from boto3.session import Session

  # DynamoDB 接続設定
  session = Session(
      aws_access_key_id='dummy',
      aws_secret_access_key='dummy',
      region_name='ap-northeast-1'
  )

  dynamodb = session.resource(
      service_name='dynamodb', 
      endpoint_url='http://localhost:4569'
  )

  def lambda_handler(event, context):

      table = dynamodb.Table('test-table')
      for record in event['Records']:
          payload = base64.b64decode(record['kinesis']['data'])
          print('Decoded Payload:{}'.format(payload))
          response = table.put_item(
                  Item={
                      'Data': payload
                  }
              )
      return "data put succeeded"
  • アップロード
  aws lambda create-function --function-name="test-kinesis-trigger" --runtime=python3.8 --role="arn:aws:iam::123456789012:role/service-role/lambda-sample-role" --handler=app.lambda_handler --zip-file fileb://app.zip --endpoint-url=http://localhost:4574 --profile=localstack
  • 更新したい場合

    aws lambda update-function-code --function-name="test-kinesis-trigger" --zip-file fileb://app.zip --endpoint-url=http://localhost:4574 --profile=localstack
    
    • Kinesisをイベントトリガーに設定
aws lambda create-event-source-mapping --function-name test-kinesis-trigger \
--batch-size 500 \
--event-source-arn arn:aws:kinesis:us-east-1:000000000000:stream/test-stream --endpoint-url=http://localhost:4574 --profile=localstack
{
    "UUID": "0ea9a7f7-4584-4d35-83c1-2390739aa442",
    "StartingPosition": "LATEST",
    "BatchSize": 100,
    "EventSourceArn": "arn:aws:kinesis:us-east-1:000000000000:stream/test-stream",
    "FunctionArn": "arn:aws:lambda:us-east-1:000000000000:function:test-kinesis-trigger",
    "LastModified": 1636171993.0,
    "LastProcessingResult": "OK",
    "State": "Enabled",
    "StateTransitionReason": "User action"
}

動作確認

  • ストリームにレコードを入力
  aws kinesis put-record --stream-name test-stream --partition-key 123 --data testdata --endpoint-url http://localhost:4568 --profile localstack
  {
      "ShardId": "shardId-000000000000",
      "SequenceNumber": "49623662862219041996029299625069687469926616397925842946"
  }
  • DynamoDB確認
  aws dynamodb scan --table-name test-table --endpoint-url=http://localhost:4569 --profile localstack
  {
      "Items": [
          {
              "Data": {
                  "S": "testdata"
              }
          },
          {
              "Data": {
                  "S": "fuga"
              }
          }
      ],
      "Count": 2,
      "ScannedCount": 2,
      "ConsumedCapacity": null
  }

参考情報

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0