8
4

More than 5 years have passed since last update.

【AWS】Kinesis上のデータを取得するワンライナー【Kinesis Stream】

Last updated at Posted at 2017-10-24

はじめに

KinesisStream上のデータを直接確認したいケースがままあるため、ワンライナーをメモ。

KinesisStream上のデータを取得するには以下の2ステップが必要です。

  1. Shard Iteratorを取得する
  2. Shard Iteratorを利用して、getRecords APIを実行する

これを1行で行います。
取得対象データのSequence Numberが分かっていることを前提とします。
また、jqにパスが通っている必要があります。

AWS CLIコマンド

基本パターン

$ aws kinesis get-records \
  --shard-iterator `aws kinesis get-shard-iterator \
    --stream-name [ストリーム名] \
    --shard-id [シャードID] \
    --shard-iterator-type AT_SEQUENCE_NUMBER \
    --starting-sequence-number [Sequence Number] \
    | jq ".ShardIterator"` \
  > output_file.json

AWS CLIのプロファイルを指定する

get-shard-iterator及びget-recordsの実行時にそれぞれ同じプロファイルの指定が必要です。
うっかり一方にしか指定しなかった場合は、恐らく意図したデータは取れないでしょう。

$ aws kinesis get-records \
  --profile [プロファイル名] \
  --shard-iterator `aws kinesis get-shard-iterator \
    --stream-name [ストリーム名] \
    --shard-id [シャードID] \
    --shard-iterator-type AT_SEQUENCE_NUMBER \
    --starting-sequence-number [Sequence Number] \
    --profile [プロファイル名] \
    | jq ".ShardIterator"` \
  > output_file.json

取得したレコードを展開する

取得したデータは下記のようなJSON形式で保存されます。

$ cat output_file.json
{
  "Records":[ {
    "Data":"dGVzdGRhdGE=",
    "PartitionKey":"123”,
    "ApproximateArrivalTimestamp": 1.441215410867E9,
    "SequenceNumber":"49544985256907370027570885864065577703022652638596431874"
  },
  {
    "Data":"dGVzdGRhdGE=",
    "PartitionKey":"123",
    "ApproximateArrivalTimestamp": 1.441215410868E9,
    "SequenceNumber":"49544985256907370027570885864065577703022652638596431875"
  } ],
  "MillisBehindLatest":24000,
  "NextShardIterator":"AAAAAAAAAAEDOW3ugseWPE4503kqN1yN1UaodY8unE0sYslMUmC6lX9hlig5+t4RtZM0/tALfiI4QGjunVgJvQsjxjh2aLyxaAaPr+LaoENQ7eVs4EdYXgKyThTZGPcca2fVXYJWL3yafv9dsDwsYVedI66dbMZFC8rPMWc797zxQkv4pSKvPOZvrUIudb8UkH3VMzx58Is="
}

Sequence Numberを指定した場合、欲しいのは最初のレコードだけかと思いますので、下記のように切り出し、Base64デコードを行います。

$ cat output_file.json | jq --raw-output '.Records[0].Data' | base64 -d > output_file.bin

指定したSequence Numberに対応するレコードを取り出し、Base64デコードを行う場合は次のように指定します。

$ cat output_file.json | jq --raw-output '.Records[] | select(.SequenceNumber == "49544985256907370027570885864065577703022652638596431875") | .Data' \
  | base64 -d > output_file.bin

参照元資料

下記の公式ドキュメントからの焼き直しです。
すみません。

Perform Basic Stream Operations -> Step 3: Get the Record
http://docs.aws.amazon.com/streams/latest/dev/fundamental-stream.html#get-records

AWS APIレベルのドキュメントを併せて載せておきます。

GetShardIterator
http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html

GetRecords
http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html

8
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
8
4