0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Lambda(SAM)からAmazon Kinesis(LocalStack) へレコード入力・取得を行う方法 メモ

Posted at
  • LocalStack上で動くAWS Kinesisのストリームに対して、AWS SAM CLIで動くLambdaからレコード入力/取得を行う方法をメモする。

構成

sam-kinesis.png

LocalStack準備

  • docker-compose.yml

    version: "3.8"
    networks:
      container-link:
        name: docker.internal
    services:
      localstack:
        container_name: "${LOCALSTACK_DOCKER_NAME-localstack_main}"
        image: localstack/localstack
        ports:
          - "127.0.0.1:53:53"                # only required for Pro
          - "127.0.0.1:53:53/udp"            # only required for Pro
          - "127.0.0.1:443:443"              # only required for Pro
          - "127.0.0.1:4510-4530:4510-4530"  # only required for Pro
          - "127.0.0.1:4566:4566"
          - "127.0.0.1:4571:4571"
        environment:
          - SERVICES=${SERVICES- }
          - DEBUG=${DEBUG- }
          - DATA_DIR=${DATA_DIR- }
          - LAMBDA_EXECUTOR=${LAMBDA_EXECUTOR- }
          - LOCALSTACK_API_KEY=${LOCALSTACK_API_KEY- }  # only required for Pro
          - HOST_TMP_FOLDER=${TMPDIR:-/tmp/}localstack
          - DOCKER_HOST=unix:///var/run/docker.sock
        volumes:
          - "${TMPDIR:-/tmp}/localstack:/tmp/localstack"
          - "/var/run/docker.sock:/var/run/docker.sock"
        networks:
          - container-link
    
  • 起動

    docker-compose up
    

Kinesis準備

ストリーム作成

aws kinesis create-stream --stream-name test-stream --shard-count 1 --endpoint-url http://localhost:4566 --profile localstack 

ストリーム確認

aws kinesis describe-stream-summary --stream-name test-stream --endpoint-url http://localhost:4566 --profile localstack
{
    "StreamDescriptionSummary": {
        "StreamName": "test-stream",
        "StreamARN": "arn:aws:kinesis:ap-northeast-1:000000000000:stream/test-stream",
        "StreamStatus": "ACTIVE",
        "RetentionPeriodHours": 24,
        "StreamCreationTimestamp": 1635931411.289,
        "EnhancedMonitoring": [
            {
                "ShardLevelMetrics": []
            }
        ],
        "EncryptionType": "NONE",
        "OpenShardCount": 1,
        "ConsumerCount": 0
    }
}

Lambda準備

Lambda

  • ひな形作成

    sam init
    

    ※ランタイムはPython3.8を選択。

  • template.yml

    • Hello World Exampleを利用する。

    • POST(レコード入力)/GET(レコード取得)用に以下修正

            Events:
              SendRecord:
                Type: Api
                Properties:
                  Path: /record
                  Method: post
              GetRecord:
                Type: Api
                Properties:
                  Path: /record
                  Method: get
      
  • requrements.txt

    requests
    boto3
    

    ※Kinesisアクセス用にboto3追加

  • Lambda処理(app.py)修正

    import json
    from boto3.session import Session
    
    # Kinesis ストリーム名
    STREAM_NAME = 'test-stream'
    
    # Kinesis 接続設定
    session = Session(
        aws_access_key_id='dummy',
        aws_secret_access_key='dummy',
        region_name='ap-northeast-1'
    )
    client = session.client(
        service_name='kinesis', 
        endpoint_url='http://localstack:4566'
    )
    
    def lambda_handler(event, context):
    
        if event["httpMethod"] == "POST":
            # レコード入力
            response = client.put_record(
                Data=json.dumps(event["body"]),
                PartitionKey='123',
                StreamName=STREAM_NAME
            )
            formatted_res = {
                "ShardId":response["ShardId"],
                "SequenceNumber":response["SequenceNumber"],
                "EncryptionType":response["EncryptionType"]
            }
            return {
                "statusCode": 200,
                "body": json.dumps(formatted_res),
            }
        if event["httpMethod"] == "GET":
            # シャードID取得
            shards = client.list_shards(StreamName=STREAM_NAME)
            shardId = shards["Shards"][0]["ShardId"]
            shardIteratorType = 'TRIM_HORIZON'
            # シャードイテレーター取得
            shardIterator = client.get_shard_iterator(
                StreamName=STREAM_NAME,
                ShardId=shardId,
                ShardIteratorType=shardIteratorType
            )
            # レコード取得
            response = client.get_records(
                ShardIterator=shardIterator["ShardIterator"]
            )
            data = []
            for record in response["Records"]:
                data.append(record["Data"].decode())
            formatted_res = {
                "Data":data
            }
            return {
                "statusCode": 200,
                "body": json.dumps(formatted_res),
            }
    
    

動作確認

  • ビルド

    sam build
    
  • 実行

    sam local start-api --docker-network docker.internal
    
  • レコード入力

    • リクエスト

      POST /record HTTP/1.1
      

Host: localhost:3000
Content-Type: application/json
Content-Length: 32
{
"record":"test record"
}
```

  • レスポンス

    {    
         "ShardId": "shardId-000000000000",     
         "SequenceNumber": "49623577140652182306549373353623065707784565606815105026",     
         "EncryptionType": "NONE"
    

}
```

  • レコード取得

    • リクエスト

      GET /record HTTP/1.1
      Host: localhost:3000
      Content-Type: application/json
      
    • レスポンス

      {    
            "Data": [        
                 "{\\n    \\\"record\\\":\\\"test record\\\"\\n}",
                 "{\\n    \\\"record\\\":\\\"test record\\\"\\n}",
                 "{\\n    \\\"record\\\":\\\"test record\\\"\\n}"    
            ]
      }
      

参考情報

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?