はじめに
KinesisStream上のデータを直接確認したいケースがままあるため、ワンライナーをメモ。
KinesisStream上のデータを取得するには以下の2ステップが必要です。
- Shard Iteratorを取得する
- Shard Iteratorを利用して、getRecords APIを実行する
これを1行で行います。
取得対象データのSequence Numberが分かっていることを前提とします。
また、jqにパスが通っている必要があります。
AWS CLIコマンド
基本パターン
$ aws kinesis get-records \
--shard-iterator `aws kinesis get-shard-iterator \
--stream-name [ストリーム名] \
--shard-id [シャードID] \
--shard-iterator-type AT_SEQUENCE_NUMBER \
--starting-sequence-number [Sequence Number] \
| jq ".ShardIterator"` \
> output_file.json
AWS CLIのプロファイルを指定する
get-shard-iterator及びget-recordsの実行時にそれぞれ同じプロファイルの指定が必要です。
うっかり一方にしか指定しなかった場合は、恐らく意図したデータは取れないでしょう。
$ aws kinesis get-records \
--profile [プロファイル名] \
--shard-iterator `aws kinesis get-shard-iterator \
--stream-name [ストリーム名] \
--shard-id [シャードID] \
--shard-iterator-type AT_SEQUENCE_NUMBER \
--starting-sequence-number [Sequence Number] \
--profile [プロファイル名] \
| jq ".ShardIterator"` \
> output_file.json
取得したレコードを展開する
取得したデータは下記のようなJSON形式で保存されます。
$ cat output_file.json
{
"Records":[ {
"Data":"dGVzdGRhdGE=",
"PartitionKey":"123”,
"ApproximateArrivalTimestamp": 1.441215410867E9,
"SequenceNumber":"49544985256907370027570885864065577703022652638596431874"
},
{
"Data":"dGVzdGRhdGE=",
"PartitionKey":"123",
"ApproximateArrivalTimestamp": 1.441215410868E9,
"SequenceNumber":"49544985256907370027570885864065577703022652638596431875"
} ],
"MillisBehindLatest":24000,
"NextShardIterator":"AAAAAAAAAAEDOW3ugseWPE4503kqN1yN1UaodY8unE0sYslMUmC6lX9hlig5+t4RtZM0/tALfiI4QGjunVgJvQsjxjh2aLyxaAaPr+LaoENQ7eVs4EdYXgKyThTZGPcca2fVXYJWL3yafv9dsDwsYVedI66dbMZFC8rPMWc797zxQkv4pSKvPOZvrUIudb8UkH3VMzx58Is="
}
Sequence Numberを指定した場合、欲しいのは最初のレコードだけかと思いますので、下記のように切り出し、Base64デコードを行います。
$ cat output_file.json | jq --raw-output '.Records[0].Data' | base64 -d > output_file.bin
指定したSequence Numberに対応するレコードを取り出し、Base64デコードを行う場合は次のように指定します。
$ cat output_file.json | jq --raw-output '.Records[] | select(.SequenceNumber == "49544985256907370027570885864065577703022652638596431875") | .Data' \
| base64 -d > output_file.bin
参照元資料
下記の公式ドキュメントからの焼き直しです。
すみません。
Perform Basic Stream Operations -> Step 3: Get the Record
http://docs.aws.amazon.com/streams/latest/dev/fundamental-stream.html#get-records
AWS APIレベルのドキュメントを併せて載せておきます。
GetShardIterator
http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html
GetRecords
http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html