- AWS CLI を使用してLocalStack上で動作する Kinesis に対して基本的なオペレーションの実行する。
LocalStack準備
-
docker-compose.yml
version: "3.8" networks: container-link: name: docker.internal services: localstack: container_name: "${LOCALSTACK_DOCKER_NAME-localstack_main}" image: localstack/localstack ports: - "127.0.0.1:53:53" # only required for Pro - "127.0.0.1:53:53/udp" # only required for Pro - "127.0.0.1:443:443" # only required for Pro - "127.0.0.1:4510-4530:4510-4530" # only required for Pro - "127.0.0.1:4566:4566" - "127.0.0.1:4571:4571" environment: - SERVICES=${SERVICES- } - DEBUG=${DEBUG- } - DATA_DIR=${DATA_DIR- } - LAMBDA_EXECUTOR=${LAMBDA_EXECUTOR- } - LOCALSTACK_API_KEY=${LOCALSTACK_API_KEY- } # only required for Pro - HOST_TMP_FOLDER=${TMPDIR:-/tmp/}localstack - DOCKER_HOST=unix:///var/run/docker.sock volumes: - "${TMPDIR:-/tmp}/localstack:/tmp/localstack" - "/var/run/docker.sock:/var/run/docker.sock" networks: - container-link
-
起動
docker-compose up
Kinesis 動作確認
ストリーム作成
aws kinesis create-stream --stream-name test-stream --shard-count 1 --endpoint-url http://localhost:4566 --profile localstack
ストリーム確認
aws kinesis describe-stream-summary --stream-name test-stream --endpoint-url http://localhost:4566 --profile localstack
{
"StreamDescriptionSummary": {
"StreamName": "test-stream",
"StreamARN": "arn:aws:kinesis:ap-northeast-1:000000000000:stream/test-stream",
"StreamStatus": "ACTIVE",
"RetentionPeriodHours": 24,
"StreamCreationTimestamp": 1635930057.836,
"EnhancedMonitoring": [
{
"ShardLevelMetrics": []
}
],
"EncryptionType": "NONE",
"OpenShardCount": 1,
"ConsumerCount": 0
}
}
レコードを入力する
-
sampledata
というテキストを含むデータレコードをストリームに入力する。
aws kinesis put-record --stream-name test-stream --partition-key 123 --data sampledata --endpoint-url http://localhost:4566 --profile localstack
{
"ShardId": "shardId-000000000000",
"SequenceNumber": "49623576657528838325581953615390847197490887986010652674",
"EncryptionType": "NONE"
}
レコードを取得する
-
シャードイテレーター(コンシューマー が読み取るストリームとシャードの位置)の取得+レコード取得
# シャードイテレーター取得 aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name test-stream --endpoint-url http://localhost:4566 --profile localstack # レコード取得 aws kinesis get-records --shard-iterator AAAAAAAAAAGz8Ur5aPeaf+OzQFT9aXskpl09sq3FzoLJHDkA1EHOsagCPkrrdkR00AgwKZOjnM8Hy8PCQ42QWn/VHX++Ey/INqpDxTfM1FZCdMm76lwRyNkyDfLl8TPGz98RdQYkik5M868WcZV1SScUyM7Q3+ZwFguSaaY9w4czBvD+NZsoRaZgTH1xRAfZupAFzLCqMWI= --endpoint-url http://localhost:4566 --profile localstack { "Records": [ { "SequenceNumber": "49623576657528838325581953615390847197490887986010652674", "ApproximateArrivalTimestamp": 1635930293.047, "Data": "c2FtcGxlZGF0YQ==", "PartitionKey": "123", "EncryptionType": "NONE" } ], "NextShardIterator": "AAAAAAAAAAECdEqfAnUgqcHdCV5MP8c8rEg/uNeaoaoHQVvdhD9UXDwWn7LW0ycRm6IPik03JDEjoiyTp8ykQNGmrPy/ZaNi88AKJYNWIIHmmjn5GP0GlG/02UDa3Jjv4R/Lc6MFhzqpPPOQ9o3xGWWNqb7qx5Y2FThy0DAKnnwS3qEqZEvLCrnXtaBVgkXBbixuPBIlP54=", "MillisBehindLatest": 0, "ChildShards": [] }
クリーンアップ
aws kinesis delete-stream --stream-name test-stream --endpoint-url http://localhost:4566 --profile localstack
※確認
aws kinesis list-streams --endpoint-url http://localhost:4566 --profile localstack
{
"StreamNames": []
}