- AWS CLI を使用してLocalStack上で動作する Kinesis に対して基本的なオペレーションの実行する。
LocalStack準備
docker-compose.yml
version: "3.8"
networks:
container-link:
name: docker.internal
services:
localstack:
container_name: "${LOCALSTACK_DOCKER_NAME-localstack_main}"
image: localstack/localstack
ports:
- "127.0.0.1:53:53" # only required for Pro
- "127.0.0.1:53:53/udp" # only required for Pro
- "127.0.0.1:443:443" # only required for Pro
- "127.0.0.1:4510-4530:4510-4530" # only required for Pro
- "127.0.0.1:4566:4566"
- "127.0.0.1:4571:4571"
environment:
- SERVICES=${SERVICES- }
- DEBUG=${DEBUG- }
- DATA_DIR=${DATA_DIR- }
- LAMBDA_EXECUTOR=${LAMBDA_EXECUTOR- }
- LOCALSTACK_API_KEY=${LOCALSTACK_API_KEY- } # only required for Pro
- HOST_TMP_FOLDER=${TMPDIR:-/tmp/}localstack
- DOCKER_HOST=unix:///var/run/docker.sock
volumes:
- "${TMPDIR:-/tmp}/localstack:/tmp/localstack"
- "/var/run/docker.sock:/var/run/docker.sock"
networks:
- container-link
- 起動
docker-compose up
Kinesis 動作確認
ストリーム作成
aws kinesis create-stream --stream-name test-stream --shard-count 1 --endpoint-url http://localhost:4566 --profile localstack
ストリーム確認
aws kinesis describe-stream-summary --stream-name test-stream --endpoint-url http://localhost:4566 --profile localstack
{
"StreamDescriptionSummary": {
"StreamName": "test-stream",
"StreamARN": "arn:aws:kinesis:ap-northeast-1:000000000000:stream/test-stream",
"StreamStatus": "ACTIVE",
"RetentionPeriodHours": 24,
"StreamCreationTimestamp": 1635930057.836,
"EnhancedMonitoring": [
{
"ShardLevelMetrics": []
}
],
"EncryptionType": "NONE",
"OpenShardCount": 1,
"ConsumerCount": 0
}
}
レコードを入力する
-
sampledata
というテキストを含むデータレコードをストリームに入力する。
aws kinesis put-record --stream-name test-stream --partition-key 123 --data sampledata --endpoint-url http://localhost:4566 --profile localstack
{
"ShardId": "shardId-000000000000",
"SequenceNumber": "49623576657528838325581953615390847197490887986010652674",
"EncryptionType": "NONE"
}
レコードを取得する
- シャードイテレーター(コンシューマー が読み取るストリームとシャードの位置)の取得+レコード取得
# シャードイテレーター取得
aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name test-stream --endpoint-url http://localhost:4566 --profile localstack
# レコード取得
aws kinesis get-records --shard-iterator AAAAAAAAAAGz8Ur5aPeaf+OzQFT9aXskpl09sq3FzoLJHDkA1EHOsagCPkrrdkR00AgwKZOjnM8Hy8PCQ42QWn/VHX++Ey/INqpDxTfM1FZCdMm76lwRyNkyDfLl8TPGz98RdQYkik5M868WcZV1SScUyM7Q3+ZwFguSaaY9w4czBvD+NZsoRaZgTH1xRAfZupAFzLCqMWI= --endpoint-url http://localhost:4566 --profile localstack
{
"Records": [
{
"SequenceNumber": "49623576657528838325581953615390847197490887986010652674",
"ApproximateArrivalTimestamp": 1635930293.047,
"Data": "c2FtcGxlZGF0YQ==",
"PartitionKey": "123",
"EncryptionType": "NONE"
}
],
"NextShardIterator": "AAAAAAAAAAECdEqfAnUgqcHdCV5MP8c8rEg/uNeaoaoHQVvdhD9UXDwWn7LW0ycRm6IPik03JDEjoiyTp8ykQNGmrPy/ZaNi88AKJYNWIIHmmjn5GP0GlG/02UDa3Jjv4R/Lc6MFhzqpPPOQ9o3xGWWNqb7qx5Y2FThy0DAKnnwS3qEqZEvLCrnXtaBVgkXBbixuPBIlP54=",
"MillisBehindLatest": 0,
"ChildShards": []
}
クリーンアップ
aws kinesis delete-stream --stream-name test-stream --endpoint-url http://localhost:4566 --profile localstack
※確認
aws kinesis list-streams --endpoint-url http://localhost:4566 --profile localstack
{
"StreamNames": []
}