LoginSignup
2
1

More than 5 years have passed since last update.

Java経験ゼロからのKinesis Data Streams(2)

Last updated at Posted at 2018-10-13
  • AWSは触るがスクリプト言語(Python、Javascript)がメイン

なエンジニアの勉強記録。

チュートリアル: AWS CLI を使用した Amazon Kinesis Data Streams の使用開始について。
AWS CLIはよく触る人向けに、パラメータについて気になった点を調べつつ。
(入出力で長くなるところは...で略しています)


Streamの作成

aws kinesis create-stream --stream-name Foo --shard-count 1

Streamの詳細取得

aws kinesis describe-stream --stream-name Foo
output
{
  "StreamDescription": {
    "Shards": [
      {
        "ShardId": "shardId-000000000000",
        "HashKeyRange": {
          "StartingHashKey": "0",
          "EndingHashKey": "340282366920938463463374607431768211455"
        },
        "SequenceNumberRange": {
          "StartingSequenceNumber": "49589148559069995537682398109276352603103412499188285442"
        }
      }
    ],
    "StreamARN": "arn:aws:kinesis:ap-northeast-1:xxxxxxxxxxxx:stream/Foo",
    "StreamName": "Foo",
    "StreamStatus": "ACTIVE",
    "RetentionPeriodHours": 24,
    "EnhancedMonitoring": [
      {
        "ShardLevelMetrics": []
      }
    ],
    "EncryptionType": "NONE",
    "KeyId": null,
    "StreamCreationTimestamp": 1539441977.0
  }
}

出力パラメータについて

※以降公式チュートリアルから少し逸れます(Kinesisの状態も厳密には一緒にはならなくなります、問題ないと思いますが)

HashKeyRange

この後Kinesisにデータを入れる時Partition Keyを指定しますが、そのキーについてMD5ハッシュをかけたものがどちらに収まるかで、書き込み先のシャードが分散されます。

# シャード数を1→2へアップデート
aws kinesis update-shard-count --stream-name Foo --target-shard-count 2 --scaling-type UNIFORM_SCALING
# しばらくしてから再度Discribe
aws kinesis describe-stream --stream-name Foo

こうするとShards:の中身が

output
{
  "ShardId": "shardId-000000000000",
  "HashKeyRange": {
    "StartingHashKey": "0",
    "EndingHashKey": "340282366920938463463374607431768211455"
  },
  "SequenceNumberRange": {
    "StartingSequenceNumber": "49589148559069995537682398109276352603103412499188285442",
    "EndingSequenceNumber": "49589148559081145910281663420845911536420133664358137858"
  }
},
{
  "ShardId": "shardId-000000000001",
  "ParentShardId": "shardId-000000000000",
  "HashKeyRange": {
    "StartingHashKey": "0",
    "EndingHashKey": "170141183460469231731687303715884105727"
  },
  "SequenceNumberRange": {
    "StartingSequenceNumber": "49589148619750323222884223677395042022979615839272042514"
  }
},
{
  "ShardId": "shardId-000000000002",
  "ParentShardId": "shardId-000000000000",
  "HashKeyRange": {
    "StartingHashKey": "170141183460469231731687303715884105728",
    "EndingHashKey": "340282366920938463463374607431768211455"
  },
  "SequenceNumberRange": {
    "StartingSequenceNumber": "49589148619772623968082754300536577741252264200778022946"
  }
}

となります。
shardId-000000000000はClose(書き込めない)状態になり、新たにshardId-000000000001shardId-000000000002がOpenしました。
shardId-000000000000では0 〜 340282366920938463463374607431768211455の範囲だったStartingHashKey, EndingHashKeyが、shardId-000000000001shardId-000000000002の間で等分されていることがわかります。

SequenceNumberRange

Kinesisに納められたレコードの順序制御番号(SequenceNumber)の範囲を示す項目。
公式用語集によると

各データレコードにはそのシャード内で一意のシーケンス番号があります。client.putRecords または client.putRecord を使用してストリームに書き込むと、Kinesis Data Streams によってシーケンス番号が割り当てられます。同じパーティションキーのシーケンス番号は一般的に、時間の経過とともに大きくなります。書き込みリクエスト間の期間が長くなるほど、シーケンス番号は大きくなります。

とあるので、この値の範囲の開始点をしめすのがStartingSequenceNumberのようです。

Streamの一覧取得

aws kinesis list-streams

Recordの入力

aws kinesis put-record --stream-name Foo --partition-key 123 --data testdata
output
{
  "ShardId": "shardId-000000000001",
  "SequenceNumber": "49589148619750323222884223677402295577897324024004870162"
}

PartitionKeyを123としたところ、shardId-000000000001へと格納されました。
shellでこのパーティションキーをハッシュ化してみると

echo -n 123| md5| tr '[:lower:]' '[:upper:]'| xargs -I@ echo "obase=10;ibase=16;@"| bc
output
42767516990368493138776584305024125808 # < 170141183460469231731687303715884105728

となるため、"shardId-000000000001"へと書き込まれるわけです。
PartitionKeyを例えばcatとすると、

aws kinesis put-record --stream-name Foo --partition-key cat --data testdata
output
{
  "ShardId": "shardId-000000000002",
  "SequenceNumber": "49589148619772623968082754300545040221989606737175380002"
}

ハッシュ値が277102220249073555409885156483852860632になるのでshardId-000000000002の方へと書き込まれます。

ShardIteratorの取得

コンシューマがあるシャードのどの位置を読んでいるか、を示すShardIteratorをレコードを取得する際に渡す必要があります。まずはそのShardIteratorを取得します。

aws kinesis get-shard-iterator --shard-id shardId-000000000001 --shard-iterator-type TRIM_HORIZON --stream-name Foo
output
{
  "ShardIterator": "AAAAA...evkSB" 
}

オプション

shard-iterator-type

保持されているレコードのどの部分をShardIteratorとして取得するかを指定できる(ドキュメント

  • AT_SEQUENCE_NUMBER
  • AFTER_SEQUENCE_NUMBER
    この2つは特定のsequence numberの位置(もしくは次の位置)からShardIteratorを取得。別オプションのStartingSequenceNumberにてそのsequence numberを指定。

  • AT_TIMESTAMP
    特定の時点からShardIteratorを取得。別オプションのStartingSequenceNumberにてそのsequence numberを指定。別オプションTimestampにてタイムスタンプを指定。

  • TRIM_HORIZON
    そのシャード中で刈り取られていない、もっとも古いデータのShardIteratorを取得。

  • LATEST
    そのシャード中でもっとも新しいデータのShardIteratorを取得。

Recordの取得

aws kinesis get-records --shard-iterator AAAAA...evkSB
output
{
  "Records": [
    {
      "SequenceNumber": "49589148619750323222884223677402295577897324024004870162",
      "ApproximateArrivalTimestamp": 1539442442.446,
      "Data": "dGVzdGRhdGE=",
      "PartitionKey": "123"
    }
  ],
  "NextShardIterator": "AAAAA...kA5HY",
  "MillisBehindLatest": 0
}

出力パラメータについて

Data

レコードの中身(今回の場合は"testdata")をBase64エンコードしたもの。

echo dGVzdGRhdGE=| base64 -D

などで結果確認可能。put-recordではBase64エンコードしたものが格納されるためこうなります。
ちなみに

実際には、AWS CLI を使用してデータを利用することはまれであり、通常は前に示したように(describe-stream および list-streams)、ストリームの状態をモニタリングしたり、情報を取得したりするために使用されるからです。

だそうです。SDKやKCLを利用することを基本は想定していることもあり、CLIはb64デコード機能はもたない模様。

Streamの削除

aws kinesis delete-stream --stream-name Foo

軽く流すつもりが結構色々Kinesisにとって重要なキーワードばかりだったので理解が深まって良かったです。次は最後のチュートリアルの模様。

2
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
1