Amazon Kinesisを使った事がなかったので、以下のチュートリアルをやってみた時のメモ。
チュートリアル: AWS CLI を使用した Amazon Kinesis の使用開始
また、操作はAWS CLIのみで実施
ストリームを作成してみる
まずはデータをKinesisへ登録するために必要なストリームを作成。
# 作成
$aws kinesis create-stream --stream-name Foo --shard-count 1
# 確認.作成中
$aws kinesis describe-stream --stream-name Foo
{
"StreamDescription": {
"StreamStatus": "CREATING",
"StreamName": "Foo",
"StreamARN": "arn:aws:kinesis:us-west-2:<account i.d.>:stream/Foo",
"Shards": []
}
}
# 作成完了
$aws kinesis describe-stream --stream-name Foo | jq '.StreamDescription.StreamStatus'
"ACTIVE"
# list-streamsでも確認可能
$aws kinesis list-streams
{
"StreamNames": [
"Foo"
]
}
- Fooというストリームを作成
- Fooというストリームのshardは1
各shardに1秒間にできる読み取り、書き込み数に制限があるので、スケールさせる場合にはshardを増やす必要がある
データをputしてみる
単一のtestdata
というデータをputしてみる。
putの際にpartition-keyを指定する必要がある。この値を元にどのシャードにデータを登録するか決定すしている
# 単一データをputする
$aws kinesis put-record --stream-name Foo --partition-key 123 --data testdata
{
"ShardId": "shardId-000000000000",
"SequenceNumber": "49558679583625444449783643462703066989558964096505741314"
}
putしたデータを取得してみる
データを取得するためにまず、シャードイテレーターを取得する。
取得する際にどのデータを取得するかの種類を指定する必要がある。取得できるのは以下の4種類。
- AT_SEQUENCE_NUMBER->指定のシーケンス番号から取得
- AFTER_SEQUENCE_NUMBER->指定のシーケンス番号以降から取得
- TRIM_HORIZON->Shardに有る一番古いデータを取得
- LATEST->最新のデータからデータ取得
今回の例では--shard-iterator-type TRIM_HORIZION
を指定
# シャードイテレーターを取得
$aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name Foo
{
"ShardIterator": "AAAAAAAAAAFk90gcDWnR35Xac1twQUDVw9m0rrmVRisgW8rJ8256urWImtJd1Kz/wIhtS5gKh6/wRxcp5yZDsBr945a1pbZeTwvKg1OqhIxbvaz6zIByo70xPLkc75CjkE6ueF8Nr/y5EIsQq8vyKQpg+heFW4Ny1CCynpBqT2kt/CEGD0typHdzKZaSwJ+kM9LotFRfzoFv8dq9dRywrGddCQoTfB+g"
}
上記によって先ほどputしたデータのシャードイテレーターが取得できたのでこれを利用して実際のデータを取得する。
$aws kinesis get-records --shard-iterator AAAAAAAAAAE/CPf/KwvtSAHAlfi++4GnlNed8BCks3jfhKy3jf1SEvWpKYhPzy2SCPhmX9eJigyYkp4gyQbLUX/m3UrjmLYj8ydJOKz5GILuUrdzQ5jtYG5froBTCBkLv7wtuNn6heGO3IiAElhjZ1cgsNASYr43KV8YfGBwrM6LBHQCDbkpk3w1ZqsfShb6hV7tIfoH0wAVtpfDJDbgLTwMQe/plrBP
{
"Records": [
{
"Data": "dGVzdGRhdGE=",
"PartitionKey": "123",
"ApproximateArrivalTimestamp": 1454050089.19,
"SequenceNumber": "49558679583625444449783643462703066989558964096505741314"
}
],
"NextShardIterator": "AAAAAAAAAAHPoSgjU+q2jJB5z+tfvCzMM+P7jtIWb4WdQ2nUJVBgDwQzVTlZugv4aOhcdSW6gqTTaap+KSmjabTst6XzuugMa71bbngbVnZeIsyBSNb8RkBsNvoUmDP3w9/6C+oyDnBR2JICuGo6pFWIXJ3eFjJpJoKfKEpvw40cpDx4DO3cZGaMUnvCm9RmHnxnDbHwWdm4i4ydN0K84oix2cvnIPbH",
"MillisBehindLatest": 0
}
データや付随する情報が取得できた。
dGVzdGRhdGE=
というのがputしたデータtestdata
のBase64エンコードした値となる。
Base64エンコードして値が正しいか確認。
$echo "dGVzdGRhdGE=" | base64 --decode
testdata
ワンライナーでやる場合、以下のような感じで
$ aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name Foo | jq -r '.ShardIterator' |xargs aws kinesis get-records --shard-iterator | jq -r '.Records[].Data' | base64 --decode
testdata
ストリームの削除
# 削除
$aws kinesis delete-stream --stream-name Foo
# 削除確認
$aws kinesis describe-stream --stream-name Foo
A client error (ResourceNotFoundException) occurred when calling the DescribeStream operation: Stream Foo under account xxxxxxx not found.
# 確認
$aws kinesis list-streams
{
"StreamNames": []
}