- LocalStack上で動くAWS Kinesisのストリームに対して、AWS SAM CLIで動くLambdaからレコード入力/取得を行う方法をメモする。
構成
LocalStack準備
-
docker-compose.yml
version: "3.8" networks: container-link: name: docker.internal services: localstack: container_name: "${LOCALSTACK_DOCKER_NAME-localstack_main}" image: localstack/localstack ports: - "127.0.0.1:53:53" # only required for Pro - "127.0.0.1:53:53/udp" # only required for Pro - "127.0.0.1:443:443" # only required for Pro - "127.0.0.1:4510-4530:4510-4530" # only required for Pro - "127.0.0.1:4566:4566" - "127.0.0.1:4571:4571" environment: - SERVICES=${SERVICES- } - DEBUG=${DEBUG- } - DATA_DIR=${DATA_DIR- } - LAMBDA_EXECUTOR=${LAMBDA_EXECUTOR- } - LOCALSTACK_API_KEY=${LOCALSTACK_API_KEY- } # only required for Pro - HOST_TMP_FOLDER=${TMPDIR:-/tmp/}localstack - DOCKER_HOST=unix:///var/run/docker.sock volumes: - "${TMPDIR:-/tmp}/localstack:/tmp/localstack" - "/var/run/docker.sock:/var/run/docker.sock" networks: - container-link
-
起動
docker-compose up
Kinesis準備
ストリーム作成
aws kinesis create-stream --stream-name test-stream --shard-count 1 --endpoint-url http://localhost:4566 --profile localstack
ストリーム確認
aws kinesis describe-stream-summary --stream-name test-stream --endpoint-url http://localhost:4566 --profile localstack
{
"StreamDescriptionSummary": {
"StreamName": "test-stream",
"StreamARN": "arn:aws:kinesis:ap-northeast-1:000000000000:stream/test-stream",
"StreamStatus": "ACTIVE",
"RetentionPeriodHours": 24,
"StreamCreationTimestamp": 1635931411.289,
"EnhancedMonitoring": [
{
"ShardLevelMetrics": []
}
],
"EncryptionType": "NONE",
"OpenShardCount": 1,
"ConsumerCount": 0
}
}
Lambda準備
Lambda
-
ひな形作成
sam init
※ランタイムはPython3.8を選択。
-
template.yml
-
Hello World Exampleを利用する。
-
POST(レコード入力)/GET(レコード取得)用に以下修正
Events: SendRecord: Type: Api Properties: Path: /record Method: post GetRecord: Type: Api Properties: Path: /record Method: get
-
-
requrements.txt
requests boto3
※Kinesisアクセス用にboto3追加
-
Lambda処理(
app.py
)修正import json from boto3.session import Session # Kinesis ストリーム名 STREAM_NAME = 'test-stream' # Kinesis 接続設定 session = Session( aws_access_key_id='dummy', aws_secret_access_key='dummy', region_name='ap-northeast-1' ) client = session.client( service_name='kinesis', endpoint_url='http://localstack:4566' ) def lambda_handler(event, context): if event["httpMethod"] == "POST": # レコード入力 response = client.put_record( Data=json.dumps(event["body"]), PartitionKey='123', StreamName=STREAM_NAME ) formatted_res = { "ShardId":response["ShardId"], "SequenceNumber":response["SequenceNumber"], "EncryptionType":response["EncryptionType"] } return { "statusCode": 200, "body": json.dumps(formatted_res), } if event["httpMethod"] == "GET": # シャードID取得 shards = client.list_shards(StreamName=STREAM_NAME) shardId = shards["Shards"][0]["ShardId"] shardIteratorType = 'TRIM_HORIZON' # シャードイテレーター取得 shardIterator = client.get_shard_iterator( StreamName=STREAM_NAME, ShardId=shardId, ShardIteratorType=shardIteratorType ) # レコード取得 response = client.get_records( ShardIterator=shardIterator["ShardIterator"] ) data = [] for record in response["Records"]: data.append(record["Data"].decode()) formatted_res = { "Data":data } return { "statusCode": 200, "body": json.dumps(formatted_res), }
動作確認
-
ビルド
sam build
-
実行
sam local start-api --docker-network docker.internal
-
レコード入力
-
リクエスト
POST /record HTTP/1.1
-
Host: localhost:3000
Content-Type: application/json
Content-Length: 32
{
"record":"test record"
}
```
-
レスポンス
{ "ShardId": "shardId-000000000000", "SequenceNumber": "49623577140652182306549373353623065707784565606815105026", "EncryptionType": "NONE"
}
```
-
レコード取得
-
リクエスト
GET /record HTTP/1.1 Host: localhost:3000 Content-Type: application/json
-
レスポンス
{ "Data": [ "{\\n \\\"record\\\":\\\"test record\\\"\\n}", "{\\n \\\"record\\\":\\\"test record\\\"\\n}", "{\\n \\\"record\\\":\\\"test record\\\"\\n}" ] }
-