LoginSignup
21
16

More than 5 years have passed since last update.

AWS CLIを使って初めてAmazon Kinesisを使ってみた

Last updated at Posted at 2016-01-29

Amazon Kinesisを使った事がなかったので、以下のチュートリアルをやってみた時のメモ。

チュートリアル: AWS CLI を使用した Amazon Kinesis の使用開始

また、操作はAWS CLIのみで実施

ストリームを作成してみる

まずはデータをKinesisへ登録するために必要なストリームを作成。

# 作成
$aws kinesis create-stream --stream-name Foo --shard-count 1

# 確認.作成中
$aws kinesis describe-stream --stream-name Foo
{
    "StreamDescription": {
        "StreamStatus": "CREATING",
        "StreamName": "Foo",
        "StreamARN": "arn:aws:kinesis:us-west-2:<account i.d.>:stream/Foo",
        "Shards": []
    }
}


# 作成完了
$aws kinesis describe-stream --stream-name Foo | jq '.StreamDescription.StreamStatus'
"ACTIVE"

# list-streamsでも確認可能
$aws kinesis list-streams
{
    "StreamNames": [
        "Foo"
    ]
}
  • Fooというストリームを作成
  • Fooというストリームのshardは1

各shardに1秒間にできる読み取り、書き込み数に制限があるので、スケールさせる場合にはshardを増やす必要がある

データをputしてみる

単一のtestdataというデータをputしてみる。

putの際にpartition-keyを指定する必要がある。この値を元にどのシャードにデータを登録するか決定すしている

# 単一データをputする
$aws kinesis put-record --stream-name Foo --partition-key 123 --data testdata
{
    "ShardId": "shardId-000000000000",
    "SequenceNumber": "49558679583625444449783643462703066989558964096505741314"
}

putしたデータを取得してみる

データを取得するためにまず、シャードイテレーターを取得する。
取得する際にどのデータを取得するかの種類を指定する必要がある。取得できるのは以下の4種類。

  • AT_SEQUENCE_NUMBER->指定のシーケンス番号から取得
  • AFTER_SEQUENCE_NUMBER->指定のシーケンス番号以降から取得
  • TRIM_HORIZON->Shardに有る一番古いデータを取得
  • LATEST->最新のデータからデータ取得

今回の例では--shard-iterator-type TRIM_HORIZIONを指定

# シャードイテレーターを取得
$aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name Foo
{
    "ShardIterator": "AAAAAAAAAAFk90gcDWnR35Xac1twQUDVw9m0rrmVRisgW8rJ8256urWImtJd1Kz/wIhtS5gKh6/wRxcp5yZDsBr945a1pbZeTwvKg1OqhIxbvaz6zIByo70xPLkc75CjkE6ueF8Nr/y5EIsQq8vyKQpg+heFW4Ny1CCynpBqT2kt/CEGD0typHdzKZaSwJ+kM9LotFRfzoFv8dq9dRywrGddCQoTfB+g"
}

上記によって先ほどputしたデータのシャードイテレーターが取得できたのでこれを利用して実際のデータを取得する。

$aws kinesis get-records --shard-iterator AAAAAAAAAAE/CPf/KwvtSAHAlfi++4GnlNed8BCks3jfhKy3jf1SEvWpKYhPzy2SCPhmX9eJigyYkp4gyQbLUX/m3UrjmLYj8ydJOKz5GILuUrdzQ5jtYG5froBTCBkLv7wtuNn6heGO3IiAElhjZ1cgsNASYr43KV8YfGBwrM6LBHQCDbkpk3w1ZqsfShb6hV7tIfoH0wAVtpfDJDbgLTwMQe/plrBP
{
    "Records": [
        {
            "Data": "dGVzdGRhdGE=",
            "PartitionKey": "123",
            "ApproximateArrivalTimestamp": 1454050089.19,
            "SequenceNumber": "49558679583625444449783643462703066989558964096505741314"
        }
    ],
    "NextShardIterator": "AAAAAAAAAAHPoSgjU+q2jJB5z+tfvCzMM+P7jtIWb4WdQ2nUJVBgDwQzVTlZugv4aOhcdSW6gqTTaap+KSmjabTst6XzuugMa71bbngbVnZeIsyBSNb8RkBsNvoUmDP3w9/6C+oyDnBR2JICuGo6pFWIXJ3eFjJpJoKfKEpvw40cpDx4DO3cZGaMUnvCm9RmHnxnDbHwWdm4i4ydN0K84oix2cvnIPbH",
    "MillisBehindLatest": 0
}

データや付随する情報が取得できた。
dGVzdGRhdGE=というのがputしたデータtestdataのBase64エンコードした値となる。

Base64エンコードして値が正しいか確認。

$echo "dGVzdGRhdGE=" | base64 --decode
testdata

ワンライナーでやる場合、以下のような感じで

$ aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name Foo | jq -r '.ShardIterator' |xargs aws kinesis get-records --shard-iterator | jq -r '.Records[].Data' | base64 --decode
testdata

ストリームの削除

# 削除
$aws kinesis delete-stream --stream-name Foo

# 削除確認
$aws kinesis describe-stream --stream-name Foo

A client error (ResourceNotFoundException) occurred when calling the DescribeStream operation: Stream Foo under account xxxxxxx not found.

# 確認
$aws kinesis list-streams
{
    "StreamNames": []
}
21
16
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
21
16