10
9

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

[JAWS-UG CLI] Kinesis:#2 データレコードの入力 / 取得

Last updated at Posted at 2015-06-18

注意事項

Amazon Kinesis に無料枠はありません。今回のハンズオンでは2つの shard を 2時間弱使用するため、利用料金として10円程度発生しますのでご注意ください。ハンズオン後はストリームの削除をお忘れなくお願いします。

Kinesis の料金 (2015年5月4日現在)
http://aws.amazon.com/jp/kinesis/pricing/

  • $0.0195 /h (1shardあたり)
  • $0.0430 /1,000,000 PutRecords

前提条件

Kinesis への権限

  • Kinesis に対してフル権限があること。

AWS CLI のバージョン

  • 以下のバージョンで動作確認済
    • AWS CLI 1.7.24
コマンド
aws --version
結果(例)
aws-cli/1.7.25 Python/2.7.6 Darwin/14.3.0
  1. 準備
    ===============================

0.1. リージョンの決定

作成するストリームのリージョンを決めます。
(カレントユーザが利用するカレントリージョンも切り変わります。)

コマンド(東京リージョンの場合)
export AWS_DEFAULT_REGION='ap-northeast-1'

0.2. 変数の確認

プロファイルとリージョンが想定のものになっていることを確認します。

変数の確認
aws configure list
結果(例)
      Name                    Value             Type    Location
      ----                    -----             ----    --------
   profile KinesisFull-handsOn-Kinesis-user-5KVYHRGZMYWG              env    AWS_DEFAULT_PROFILE
access_key     ****************HORA shared-credentials-file    
secret_key     ****************F9zU shared-credentials-file    
    region           ap-northeast-1              env    AWS_DEFAULT_REGION

0.3. ストリームの指定

command
aws kinesis list-streams
command
STREAM_NAME='<ストリーム名>'

0.4. ストリームの内容確認

command
aws kinesis describe-stream --stream-name ${STREAM_NAME}
result
{
    "StreamDescription": {
        "StreamStatus": "ACTIVE", 
        "StreamName": "handson", 
        "StreamARN": "arn:aws:kinesis:ap-northeast-1:XXXXXXXXXXXX:stream/handson", 
        "Shards": [
            {
                "ShardId": "shardId-000000000000", 
                "HashKeyRange": {
                    "EndingHashKey": "340282366920938463463374607431768211455", 
                    "StartingHashKey": "0"
                }, 
                "SequenceNumberRange": {
                    "StartingSequenceNumber": "49550864092651309338540168505480986788282674130572017666"
                }
            }
        ]
    }
}
  1. 事前準備
    ==================

1.1. 入力データの決定

Kinesis のストリーム に送信するデータを決めます。

command
MY_DATA="DATA0001"

1.2. パーティションキーの決定

上記データのパーティションキーを決めます。この文字列によって、どの shard に割り振られるかが決まります (現在は shard の数は1のため必ず同じ shard に割り当てられます)。

command
PARTITION_KEY="JAWS-UG"
  1. データレコードの入力
    ==========================

2.1. データレコードの入力

データレコード(データとパーティションキー)をストーリームに送信します。

変数の確認
cat << ETX

STREAM_NAME: ${STREAM_NAME}
MY_DATA: ${MY_DATA}
PARTITION_KEY: ${PARTITION_KEY}

ETX
command
aws kinesis put-record --stream-name ${STREAM_NAME} --partition-key ${PARTITION_KEY} --data ${MY_DATA}
result
{
    "ShardId": "shardId-000000000000", 
    "SequenceNumber": "49550864092651309338540169253629565960074435278619541506"
}

シーケンスナンバーが付与され、shardId-0 へデータレコードが割り振られたことを確認できました。

  1. データレコードの取得
    ================================

2.1. shardId と イテレータタイプの決定

データを取得する shardId を決めます

command
SHARD_ID=shardId-000000000000

指定した Shard に対して、どの位置からデータを取得するかを決めるため、イテレータタイプを指定します。ここでは、最も古いデータから取得可能な "TRIM_HORIZON" を使います

command
ITERATOR_TYPE="TRIM_HORIZON"

2.2 シャードイテレータ の取得

command
cat << ETX

stream-name         : ${STREAM_NAME}
shard-id            : ${SHARD_ID}
shard-iterator-type : ${ITERATOR_TYPE} 

ETX
command
ITERATOR=`aws kinesis get-shard-iterator --shard-id ${SHARD_ID} --shard-iterator-type ${ITERATOR_TYPE} --stream-name ${STREAM_NAME} | jq -r ".ShardIterator"` && echo "${ITERATOR}"
result
AAAAAAAAAAGHoIrHr76EenUaL/Ltu/hSFVJ5AMKwNYLqStmlmn+Edt1jhEx/+MvPhssy/s8eKZo2zwdw+is3PpuXtM3AbJhy1DAqDjPeUa5I3LmYuwq8eh+NxIeCi1KD7B4kl2h76FPCm0T7kdpwKbr1YIZifJMDNode9ZiQBNdicQKE/tdnFgy6eryHEIMQpQmCXExzEQQ=

2.3 データレコードの取得

command
aws kinesis get-records --shard-iterator ${ITERATOR}
result
{
    "Records": [
        {
            "PartitionKey": "JAWS-UG", 
            "Data": "REFUQTAwMDE=", 
            "SequenceNumber": "49550864092651309338540169253629565960074435278619541506"
        }
    ], 
    "NextShardIterator": "AAAAAAAAAAHSixRwTU8RavU+5EvyTyUgkfrGoVcHaRdQU4UkKfuZ1ZNVJWw7DEK2WqnbBJwmFV/uzCyLqumMx0DZl9j8ckYTnmx+AuGFv78wrtIstuKzy3tCcBpCG4SgsdrveWaFu311h9VVXgcAggK3QzZD17VdOkGl/dclsqo9/DXmf+0soIzxB8YjDyiT9Mj2CyQf85E="
}

"put-record" コマンドでは バイナリデータを送信できるように Base64 エンコーディングを使用しています。そのため "Data" の値は 文字化けのように見えます。下記のようにすることで、デコードすることができます(上記イテレータ取得から5分以内に実行してください)。

command
aws kinesis get-records --shard-iterator ${ITERATOR} | jq -r '.Records[] | "\(.Data)"'  | openssl enc -d -base64
result
DATA0001

  
完了

次に shard の分割を行います。
http://qiita.com/daikumatan/items/9d423aaeb5b00f55645f

10
9
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
10
9

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?