Help us understand the problem. What is going on with this article?

AWS CLIを使って初めてAmazon Kinesisを使ってみた

More than 3 years have passed since last update.

Amazon Kinesisを使った事がなかったので、以下のチュートリアルをやってみた時のメモ。

チュートリアル: AWS CLI を使用した Amazon Kinesis の使用開始

また、操作はAWS CLIのみで実施

ストリームを作成してみる

まずはデータをKinesisへ登録するために必要なストリームを作成。

# 作成
$aws kinesis create-stream --stream-name Foo --shard-count 1

# 確認.作成中
$aws kinesis describe-stream --stream-name Foo
{
    "StreamDescription": {
        "StreamStatus": "CREATING",
        "StreamName": "Foo",
        "StreamARN": "arn:aws:kinesis:us-west-2:<account i.d.>:stream/Foo",
        "Shards": []
    }
}


# 作成完了
$aws kinesis describe-stream --stream-name Foo | jq '.StreamDescription.StreamStatus'
"ACTIVE"

# list-streamsでも確認可能
$aws kinesis list-streams
{
    "StreamNames": [
        "Foo"
    ]
}
  • Fooというストリームを作成
  • Fooというストリームのshardは1

各shardに1秒間にできる読み取り、書き込み数に制限があるので、スケールさせる場合にはshardを増やす必要がある

データをputしてみる

単一のtestdataというデータをputしてみる。

putの際にpartition-keyを指定する必要がある。この値を元にどのシャードにデータを登録するか決定すしている

# 単一データをputする
$aws kinesis put-record --stream-name Foo --partition-key 123 --data testdata
{
    "ShardId": "shardId-000000000000",
    "SequenceNumber": "49558679583625444449783643462703066989558964096505741314"
}

putしたデータを取得してみる

データを取得するためにまず、シャードイテレーターを取得する。
取得する際にどのデータを取得するかの種類を指定する必要がある。取得できるのは以下の4種類。

  • AT_SEQUENCE_NUMBER->指定のシーケンス番号から取得
  • AFTER_SEQUENCE_NUMBER->指定のシーケンス番号以降から取得
  • TRIM_HORIZON->Shardに有る一番古いデータを取得
  • LATEST->最新のデータからデータ取得

今回の例では--shard-iterator-type TRIM_HORIZIONを指定

# シャードイテレーターを取得
$aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name Foo
{
    "ShardIterator": "AAAAAAAAAAFk90gcDWnR35Xac1twQUDVw9m0rrmVRisgW8rJ8256urWImtJd1Kz/wIhtS5gKh6/wRxcp5yZDsBr945a1pbZeTwvKg1OqhIxbvaz6zIByo70xPLkc75CjkE6ueF8Nr/y5EIsQq8vyKQpg+heFW4Ny1CCynpBqT2kt/CEGD0typHdzKZaSwJ+kM9LotFRfzoFv8dq9dRywrGddCQoTfB+g"
}

上記によって先ほどputしたデータのシャードイテレーターが取得できたのでこれを利用して実際のデータを取得する。

$aws kinesis get-records --shard-iterator AAAAAAAAAAE/CPf/KwvtSAHAlfi++4GnlNed8BCks3jfhKy3jf1SEvWpKYhPzy2SCPhmX9eJigyYkp4gyQbLUX/m3UrjmLYj8ydJOKz5GILuUrdzQ5jtYG5froBTCBkLv7wtuNn6heGO3IiAElhjZ1cgsNASYr43KV8YfGBwrM6LBHQCDbkpk3w1ZqsfShb6hV7tIfoH0wAVtpfDJDbgLTwMQe/plrBP
{
    "Records": [
        {
            "Data": "dGVzdGRhdGE=",
            "PartitionKey": "123",
            "ApproximateArrivalTimestamp": 1454050089.19,
            "SequenceNumber": "49558679583625444449783643462703066989558964096505741314"
        }
    ],
    "NextShardIterator": "AAAAAAAAAAHPoSgjU+q2jJB5z+tfvCzMM+P7jtIWb4WdQ2nUJVBgDwQzVTlZugv4aOhcdSW6gqTTaap+KSmjabTst6XzuugMa71bbngbVnZeIsyBSNb8RkBsNvoUmDP3w9/6C+oyDnBR2JICuGo6pFWIXJ3eFjJpJoKfKEpvw40cpDx4DO3cZGaMUnvCm9RmHnxnDbHwWdm4i4ydN0K84oix2cvnIPbH",
    "MillisBehindLatest": 0
}

データや付随する情報が取得できた。
dGVzdGRhdGE=というのがputしたデータtestdataのBase64エンコードした値となる。

Base64エンコードして値が正しいか確認。

$echo "dGVzdGRhdGE=" | base64 --decode
testdata

ワンライナーでやる場合、以下のような感じで

$ aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name Foo | jq -r '.ShardIterator' |xargs aws kinesis get-records --shard-iterator | jq -r '.Records[].Data' | base64 --decode
testdata

ストリームの削除

# 削除
$aws kinesis delete-stream --stream-name Foo

# 削除確認
$aws kinesis describe-stream --stream-name Foo

A client error (ResourceNotFoundException) occurred when calling the DescribeStream operation: Stream Foo under account xxxxxxx not found.

# 確認
$aws kinesis list-streams
{
    "StreamNames": []
}
toshihirock
こちらは個人の意見で会社とは関係ありません。お約束です。
http://toshihirock.blogspot.jp/
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away