Edited at

AWS CLIを使って初めてAmazon Kinesisを使ってみた

More than 3 years have passed since last update.

Amazon Kinesisを使った事がなかったので、以下のチュートリアルをやってみた時のメモ。

チュートリアル: AWS CLI を使用した Amazon Kinesis の使用開始

また、操作はAWS CLIのみで実施


ストリームを作成してみる

まずはデータをKinesisへ登録するために必要なストリームを作成。

# 作成

$aws kinesis create-stream --stream-name Foo --shard-count 1

# 確認.作成中
$aws kinesis describe-stream --stream-name Foo
{
"StreamDescription": {
"StreamStatus": "CREATING",
"StreamName": "Foo",
"StreamARN": "arn:aws:kinesis:us-west-2:<account i.d.>:stream/Foo",
"Shards": []
}
}

# 作成完了
$aws kinesis describe-stream --stream-name Foo | jq '.StreamDescription.StreamStatus'
"ACTIVE"

# list-streamsでも確認可能
$aws kinesis list-streams
{
"StreamNames": [
"Foo"
]
}


  • Fooというストリームを作成

  • Fooというストリームのshardは1

各shardに1秒間にできる読み取り、書き込み数に制限があるので、スケールさせる場合にはshardを増やす必要がある


データをputしてみる

単一のtestdataというデータをputしてみる。

putの際にpartition-keyを指定する必要がある。この値を元にどのシャードにデータを登録するか決定すしている

# 単一データをputする

$aws kinesis put-record --stream-name Foo --partition-key 123 --data testdata
{
"ShardId": "shardId-000000000000",
"SequenceNumber": "49558679583625444449783643462703066989558964096505741314"
}


putしたデータを取得してみる

データを取得するためにまず、シャードイテレーターを取得する。

取得する際にどのデータを取得するかの種類を指定する必要がある。取得できるのは以下の4種類。


  • AT_SEQUENCE_NUMBER->指定のシーケンス番号から取得

  • AFTER_SEQUENCE_NUMBER->指定のシーケンス番号以降から取得

  • TRIM_HORIZON->Shardに有る一番古いデータを取得

  • LATEST->最新のデータからデータ取得

今回の例では--shard-iterator-type TRIM_HORIZIONを指定

# シャードイテレーターを取得

$aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name Foo
{
"ShardIterator": "AAAAAAAAAAFk90gcDWnR35Xac1twQUDVw9m0rrmVRisgW8rJ8256urWImtJd1Kz/wIhtS5gKh6/wRxcp5yZDsBr945a1pbZeTwvKg1OqhIxbvaz6zIByo70xPLkc75CjkE6ueF8Nr/y5EIsQq8vyKQpg+heFW4Ny1CCynpBqT2kt/CEGD0typHdzKZaSwJ+kM9LotFRfzoFv8dq9dRywrGddCQoTfB+g"
}

上記によって先ほどputしたデータのシャードイテレーターが取得できたのでこれを利用して実際のデータを取得する。

$aws kinesis get-records --shard-iterator AAAAAAAAAAE/CPf/KwvtSAHAlfi++4GnlNed8BCks3jfhKy3jf1SEvWpKYhPzy2SCPhmX9eJigyYkp4gyQbLUX/m3UrjmLYj8ydJOKz5GILuUrdzQ5jtYG5froBTCBkLv7wtuNn6heGO3IiAElhjZ1cgsNASYr43KV8YfGBwrM6LBHQCDbkpk3w1ZqsfShb6hV7tIfoH0wAVtpfDJDbgLTwMQe/plrBP

{
"Records": [
{
"Data": "dGVzdGRhdGE=",
"PartitionKey": "123",
"ApproximateArrivalTimestamp": 1454050089.19,
"SequenceNumber": "49558679583625444449783643462703066989558964096505741314"
}
],
"NextShardIterator": "AAAAAAAAAAHPoSgjU+q2jJB5z+tfvCzMM+P7jtIWb4WdQ2nUJVBgDwQzVTlZugv4aOhcdSW6gqTTaap+KSmjabTst6XzuugMa71bbngbVnZeIsyBSNb8RkBsNvoUmDP3w9/6C+oyDnBR2JICuGo6pFWIXJ3eFjJpJoKfKEpvw40cpDx4DO3cZGaMUnvCm9RmHnxnDbHwWdm4i4ydN0K84oix2cvnIPbH",
"MillisBehindLatest": 0
}

データや付随する情報が取得できた。

dGVzdGRhdGE=というのがputしたデータtestdataのBase64エンコードした値となる。

Base64エンコードして値が正しいか確認。

$echo "dGVzdGRhdGE=" | base64 --decode

testdata

ワンライナーでやる場合、以下のような感じで

$ aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name Foo | jq -r '.ShardIterator' |xargs aws kinesis get-records --shard-iterator | jq -r '.Records[].Data' | base64 --decode

testdata


ストリームの削除

# 削除

$aws kinesis delete-stream --stream-name Foo

# 削除確認
$aws kinesis describe-stream --stream-name Foo

A client error (ResourceNotFoundException) occurred when calling the DescribeStream operation: Stream Foo under account xxxxxxx not found.

# 確認
$aws kinesis list-streams
{
"StreamNames": []
}