【AWS】Kinesis上のデータを取得するワンライナー【Kinesis Stream】

  • 1
    Like
  • 0
    Comment

はじめに

KinesisStream上のデータを直接確認したいケースがままあるため、ワンライナーをメモ。

KinesisStream上のデータを取得するには以下の2ステップが必要です。

  1. Shard Iteratorを取得する
  2. Shard Iteratorを利用して、getRecords APIを実行する

これを1行で行います。
取得対象データのSequence Numberが分かっていることを前提とします。
また、jqにパスが通っている必要があります。

AWS CLIコマンド

基本パターン

$ aws kinesis get-records \
  --shard-iterator `aws kinesis get-shard-iterator \
    --stream-name [ストリーム名] \
    --shard-id [シャードID] \
    --shard-iterator-type AT_SEQUENCE_NUMBER \
    --starting-sequence-number [Sequence Number] \
    | jq ".ShardIterator"` \
  > output_file.json

AWS CLIのプロファイルを指定する

get-shard-iterator及びget-recordsの実行時にそれぞれ同じプロファイルの指定が必要です。
うっかり一方にしか指定しなかった場合は、恐らく意図したデータは取れないでしょう。

$ aws kinesis get-records \
  --profile [プロファイル名] \
  --shard-iterator `aws kinesis get-shard-iterator \
    --stream-name [ストリーム名] \
    --shard-id [シャードID] \
    --shard-iterator-type AT_SEQUENCE_NUMBER \
    --starting-sequence-number [Sequence Number] \
    --profile [プロファイル名] \
    | jq ".ShardIterator"` \
  > output_file.json

取得したレコードを展開する

取得したデータは下記のようなJSON形式で保存されます。

$ cat output_file.json
{
  "Records":[ {
    "Data":"dGVzdGRhdGE=",
    "PartitionKey":"123”,
    "ApproximateArrivalTimestamp": 1.441215410867E9,
    "SequenceNumber":"49544985256907370027570885864065577703022652638596431874"
  },
  {
    "Data":"dGVzdGRhdGE=",
    "PartitionKey":"123”,
    "ApproximateArrivalTimestamp": 1.441215410868E9,
    "SequenceNumber":"49544985256907370027570885864065577703022652638596431875"
  } ],
  "MillisBehindLatest":24000,
  "NextShardIterator":"AAAAAAAAAAEDOW3ugseWPE4503kqN1yN1UaodY8unE0sYslMUmC6lX9hlig5+t4RtZM0/tALfiI4QGjunVgJvQsjxjh2aLyxaAaPr+LaoENQ7eVs4EdYXgKyThTZGPcca2fVXYJWL3yafv9dsDwsYVedI66dbMZFC8rPMWc797zxQkv4pSKvPOZvrUIudb8UkH3VMzx58Is="
}

Sequence Numberを指定した場合、欲しいのは最初のレコードだけかと思いますので、下記のように切り出し、Base64デコードを行います。

$ cat output_file.json | jq --raw-output '.Records[0].Data' | base64 -d > output_file.bin

参照元資料

下記の公式ドキュメントからの焼き直しです。
すみません。

Perform Basic Stream Operations -> Step 3: Get the Record
http://docs.aws.amazon.com/streams/latest/dev/fundamental-stream.html#get-records

AWS APIレベルのドキュメントを併せて載せておきます。

GetShardIterator
http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html

GetRecords
http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html