- AWSは触るがスクリプト言語(Python、Javascript)がメイン
なエンジニアの勉強記録。
チュートリアル: AWS CLI を使用した Amazon Kinesis Data Streams の使用開始について。
AWS CLIはよく触る人向けに、パラメータについて気になった点を調べつつ。
(入出力で長くなるところは...
で略しています)
Streamの作成
aws kinesis create-stream --stream-name Foo --shard-count 1
Streamの詳細取得
aws kinesis describe-stream --stream-name Foo
{
"StreamDescription": {
"Shards": [
{
"ShardId": "shardId-000000000000",
"HashKeyRange": {
"StartingHashKey": "0",
"EndingHashKey": "340282366920938463463374607431768211455"
},
"SequenceNumberRange": {
"StartingSequenceNumber": "49589148559069995537682398109276352603103412499188285442"
}
}
],
"StreamARN": "arn:aws:kinesis:ap-northeast-1:xxxxxxxxxxxx:stream/Foo",
"StreamName": "Foo",
"StreamStatus": "ACTIVE",
"RetentionPeriodHours": 24,
"EnhancedMonitoring": [
{
"ShardLevelMetrics": []
}
],
"EncryptionType": "NONE",
"KeyId": null,
"StreamCreationTimestamp": 1539441977.0
}
}
出力パラメータについて
※以降公式チュートリアルから少し逸れます(Kinesisの状態も厳密には一緒にはならなくなります、問題ないと思いますが)
HashKeyRange
この後Kinesisにデータを入れる時Partition Keyを指定しますが、そのキーについてMD5ハッシュをかけたものがどちらに収まるかで、書き込み先のシャードが分散されます。
# シャード数を1→2へアップデート
aws kinesis update-shard-count --stream-name Foo --target-shard-count 2 --scaling-type UNIFORM_SCALING
# しばらくしてから再度Discribe
aws kinesis describe-stream --stream-name Foo
こうするとShards:
の中身が
{
"ShardId": "shardId-000000000000",
"HashKeyRange": {
"StartingHashKey": "0",
"EndingHashKey": "340282366920938463463374607431768211455"
},
"SequenceNumberRange": {
"StartingSequenceNumber": "49589148559069995537682398109276352603103412499188285442",
"EndingSequenceNumber": "49589148559081145910281663420845911536420133664358137858"
}
},
{
"ShardId": "shardId-000000000001",
"ParentShardId": "shardId-000000000000",
"HashKeyRange": {
"StartingHashKey": "0",
"EndingHashKey": "170141183460469231731687303715884105727"
},
"SequenceNumberRange": {
"StartingSequenceNumber": "49589148619750323222884223677395042022979615839272042514"
}
},
{
"ShardId": "shardId-000000000002",
"ParentShardId": "shardId-000000000000",
"HashKeyRange": {
"StartingHashKey": "170141183460469231731687303715884105728",
"EndingHashKey": "340282366920938463463374607431768211455"
},
"SequenceNumberRange": {
"StartingSequenceNumber": "49589148619772623968082754300536577741252264200778022946"
}
}
となります。
shardId-000000000000
はClose(書き込めない)状態になり、新たにshardId-000000000001
、shardId-000000000002
がOpenしました。
shardId-000000000000
では0 〜 340282366920938463463374607431768211455の範囲だったStartingHashKey
, EndingHashKey
が、shardId-000000000001
、shardId-000000000002
の間で等分されていることがわかります。
SequenceNumberRange
Kinesisに納められたレコードの順序制御番号(SequenceNumber)の範囲を示す項目。
公式用語集によると
各データレコードにはそのシャード内で一意のシーケンス番号があります。client.putRecords または client.putRecord を使用してストリームに書き込むと、Kinesis Data Streams によってシーケンス番号が割り当てられます。同じパーティションキーのシーケンス番号は一般的に、時間の経過とともに大きくなります。書き込みリクエスト間の期間が長くなるほど、シーケンス番号は大きくなります。
とあるので、この値の範囲の開始点をしめすのがStartingSequenceNumber
のようです。
Streamの一覧取得
aws kinesis list-streams
Recordの入力
aws kinesis put-record --stream-name Foo --partition-key 123 --data testdata
{
"ShardId": "shardId-000000000001",
"SequenceNumber": "49589148619750323222884223677402295577897324024004870162"
}
PartitionKeyを123
としたところ、shardId-000000000001
へと格納されました。
shellでこのパーティションキーをハッシュ化してみると
echo -n 123| md5| tr '[:lower:]' '[:upper:]'| xargs -I@ echo "obase=10;ibase=16;@"| bc
42767516990368493138776584305024125808 # < 170141183460469231731687303715884105728
となるため、"shardId-000000000001"
へと書き込まれるわけです。
PartitionKeyを例えばcat
とすると、
aws kinesis put-record --stream-name Foo --partition-key cat --data testdata
{
"ShardId": "shardId-000000000002",
"SequenceNumber": "49589148619772623968082754300545040221989606737175380002"
}
ハッシュ値が277102220249073555409885156483852860632
になるのでshardId-000000000002
の方へと書き込まれます。
ShardIteratorの取得
コンシューマがあるシャードのどの位置を読んでいるか、を示すShardIterator
をレコードを取得する際に渡す必要があります。まずはそのShardIterator
を取得します。
aws kinesis get-shard-iterator --shard-id shardId-000000000001 --shard-iterator-type TRIM_HORIZON --stream-name Foo
{
"ShardIterator": "AAAAA...evkSB"
}
オプション
shard-iterator-type
保持されているレコードのどの部分をShardIterator
として取得するかを指定できる(ドキュメント)
AT_SEQUENCE_NUMBER
AFTER_SEQUENCE_NUMBER
この2つは特定のsequence numberの位置(もしくは次の位置)からShardIteratorを取得。別オプションのStartingSequenceNumber
にてそのsequence numberを指定。AT_TIMESTAMP
特定の時点からShardIteratorを取得。別オプションのStartingSequenceNumber
にてそのsequence numberを指定。別オプションTimestamp
にてタイムスタンプを指定。TRIM_HORIZON
そのシャード中で刈り取られていない、もっとも古いデータのShardIteratorを取得。LATEST
そのシャード中でもっとも新しいデータのShardIteratorを取得。
Recordの取得
aws kinesis get-records --shard-iterator AAAAA...evkSB
{
"Records": [
{
"SequenceNumber": "49589148619750323222884223677402295577897324024004870162",
"ApproximateArrivalTimestamp": 1539442442.446,
"Data": "dGVzdGRhdGE=",
"PartitionKey": "123"
}
],
"NextShardIterator": "AAAAA...kA5HY",
"MillisBehindLatest": 0
}
出力パラメータについて
Data
レコードの中身(今回の場合は"testdata"
)をBase64エンコードしたもの。
echo dGVzdGRhdGE=| base64 -D
などで結果確認可能。put-record
ではBase64エンコードしたものが格納されるためこうなります。
ちなみに
実際には、AWS CLI を使用してデータを利用することはまれであり、通常は前に示したように(describe-stream および list-streams)、ストリームの状態をモニタリングしたり、情報を取得したりするために使用されるからです。
だそうです。SDKやKCLを利用することを基本は想定していることもあり、CLIはb64デコード機能はもたない模様。
Streamの削除
aws kinesis delete-stream --stream-name Foo
軽く流すつもりが結構色々Kinesisにとって重要なキーワードばかりだったので理解が深まって良かったです。次は最後のチュートリアルの模様。