Kinesisの雰囲気をつかむためにawscliで操作する

  • 9
    いいね
  • 0
    コメント
この記事は最終更新日から1年以上が経過しています。

やること

Streamの作成、Recordの追加/確認、Streamの削除、Shardの分割/結合をawscliコマンドで確認する

コマンド

Stream作成

コマンド投入後はStreamStatusがCREATINGになり、数秒でACTIVEとなり使用できるようになりました。

$ stream_name=好きな名前
$ aws kinesis create-stream --stream-name $stream_name --shard-count 1

put-recordでデータを投入

put-recordsで入れるのがデフォだが、テストなのでput-recordを使う。古いawscliではput-recordsがサポートされていないのでもし使うときは注意。

$ aws kinesis put-record --stream-name $stream_name --data hoge --partition-key key1
{
    "ShardId": "shardId-000000000000",
    "SequenceNumber": "49551577762526400408247905360155882849256718234908884994"
}
$ aws kinesis put-record --stream-name $stream_name --data foo --partition-key key2
{
    "SequenceNumber": "49551577762526400408247905360157091775076333207680974850",
    "ShardId": "shardId-000000000000"
}

get-recordsでデータ取り出し

  • shard_id取得、shard_iteratorを取得してからget-recordsが使用できる。
  • 取得したshard_iteratorは5分間有効。
  • shard-iterator-type にはAT_SEQUENCE_NUMBER, AFTER_SEQUENCE_NUMBER, TRIM_HORIZON, LATESTのいずれかを指定する。
$ shard_id=$(aws kinesis describe-stream --stream-name $stream_name | jq -r ".StreamDescription.Shards[0].ShardId")
$ shard_iterator=$(aws kinesis get-shard-iterator --stream-name $stream_name --shard-id $shard_id --shard-iterator-type TRIM_HORIZON | jq -r ".ShardIterator")

$ aws kinesis get-records --shard-iterator $shard_iterator --limit 10
{
    "NextShardIterator": "xxx",
    "MillisBehindLatest": 67000,
    "Records": [
        {
            "Data": "aG9nZQ==",
            "PartitionKey": "key1",
            "SequenceNumber": "49551577762526400408247905360155882849256718234908884994"
        },
        {
            "Data": "Zm9v",
            "PartitionKey": "key2",
            "SequenceNumber": "49551577762526400408247905360157091775076333207680974850"
        }
    ]
}

streamを削除

$ aws kinesis delete-stream --stream-name $stream_name

シャードを確認

aws kinesis describe-stream --stream-name $stream_name

シャード分割

  • 隣接しているシャードを分割/結合できる
  • (StartingHashKeyとEndingHashKeyが重なっているシャード)
$ aws kinesis split-shard --stream-name $stream_name --shard-to-split ${splitするシャード} --new-starting-hash-key ${どのhash-keyから分割するか}

シャード結合

$ aws kinesis merge-shards --stream-name $stream_name --shard-to-merge ${mergeするシャード前半} --adjacent-shard-to-merge ${mergeするシャード後半}