Help us understand the problem. What is going on with this article?

[JAWS-UG CLI] Kinesis:#4 複数データレコードの一括入力 / 取得

More than 3 years have passed since last update.

複数データレコードが記述されたJSONファイル作成し、一括でデータを put します。個別にデータレコードを送信するときに比べ、オーバーヘッドを減らし、効率的にデータを送信することができます。リアルタイムにデータを可視化したい場合などに適しています。

注意事項

Amazon Kinesis に無料枠はありません。今回のハンズオンでは2つの shard を 2時間弱使用するため、利用料金として10円程度発生しますのでご注意ください。ハンズオン後はストリームの削除をお忘れなくお願いします。

Kinesis の料金 (2015年5月4日現在)
http://aws.amazon.com/jp/kinesis/pricing/

  • $0.0195 /h (1shardあたり)
  • $0.0430 /1,000,000 PutRecords

前提条件

Kinesis への権限

  • Kinesis に対してフル権限があること。

AWS CLI のバージョン

  • 以下のバージョンで動作確認済
    • AWS CLI 1.7.24
コマンド
aws --version
結果(例)
aws-cli/1.7.25 Python/2.7.6 Darwin/14.3.0

0. 準備

0.1. リージョンの決定

作成するストリームのリージョンを決めます。
(カレントユーザが利用するカレントリージョンも切り変わります。)

コマンド(東京リージョンの場合)
export AWS_DEFAULT_REGION='ap-northeast-1'

0.2. 変数の確認

プロファイルとリージョンが想定のものになっていることを確認します。

変数の確認
aws configure list
結果(例)
      Name                    Value             Type    Location
      ----                    -----             ----    --------
   profile KinesisFull-handsOn-Kinesis-user-5KVYHRGZMYWG              env    AWS_DEFAULT_PROFILE
access_key     ****************HORA shared-credentials-file    
secret_key     ****************F9zU shared-credentials-file    
    region           ap-northeast-1              env    AWS_DEFAULT_REGION

0.3. ストリームの指定

command
aws kinesis list-streams
command
STREAM_NAME='<ストリーム名>'

0.4. ストリームの確認

command
aws kinesis describe-stream --stream-name ${STREAM_NAME}
result
{
    "StreamDescription": {
        "StreamStatus": "ACTIVE", 
        "StreamName": "handson", 
        "StreamARN": "arn:aws:kinesis:ap-northeast-1:XXXXXXXXXXXX:stream/handson", 
        "Shards": [
            {
                "ShardId": "shardId-000000000000", 
                "HashKeyRange": {
                    "EndingHashKey": "340282366920938463463374607431768211455", 
                    "StartingHashKey": "0"
                }, 
                "SequenceNumberRange": {
                    "EndingSequenceNumber": "49550274379963104000400348362601688132548853773472104450", 
                    "StartingSequenceNumber": "49550274379951953627801083051032129199232032484024647682"
                }
            }, 
            {
                "ShardId": "shardId-000000000001", 
                "HashKeyRange": {
                    "EndingHashKey": "170141183460469231731687303715884105727", 
                    "StartingHashKey": "0"
                }, 
                "ParentShardId": "shardId-000000000000", 
                "SequenceNumberRange": {
                    "StartingSequenceNumber": "49550274960507253381148795294631482991086939375801466898"
                }
            }, 
            {
                "ShardId": "shardId-000000000002", 
                "HashKeyRange": {
                    "EndingHashKey": "340282366920938463463374607431768211455", 
                    "StartingHashKey": "170141183460469231731687303715884105728"
                }, 
                "ParentShardId": "shardId-000000000000", 
                "SequenceNumberRange": {
                    "StartingSequenceNumber": "49550274960529554126347325917773018709359587737307447330"
                }
            }
        ]
    }
}

1. 事前準備

複数のデータレコードが記述されたJSONファイルを作成します。1つのレコードに対して、少なくともデータとパーティションキーをセットに送信します。

1.1. パーティションキーの設計

 実際の利用時は、どの shardId も負荷が均等になるように 「パーティションキーのハッシュ値はランダムで付与するほうがベター」です。しかし今回は、Kinesis の動作の理解のため、自らパーティションキーを設計し、データが期待通りの shardId に振り分けられることを確認します。

 前セクションで shardId-0 を半分で分割したため、shardId-1, shardId-2 は、ハッシュ値 2^128/2 (=2^127) で分割されています。

最初に、shardId-1とshardId-2を分割ポイントを明示します

comand
POINT_SEPARATE='2^127'

次に、分割した2つの shard にデータレコードをうまく振り分けられるようなパーティションキーを見つけます。

パーティションキーの準備 (shardId-1用)

"Strawberry" の MD5 ハッシュ値が、shardId-1 のパーティションキーとして使えるか確認します。

下記コマンドは、次のように動作します。

  • shardId-1用
    • ハッシュ値 < ${POINT_SEPARATE}
    • 結果: "LessThan" を表示
  • shardId-2用
    • ハッシュ値 > ${POINT_SEPARATE}
    • 結果: "GreaterEqual" を表示

つまり shardId-1 用では "LessThan" と表示されればOKです!

Mac OS X, BSD系の場合

command
KEY_NAME='Strawberry'
RESULT_COMPARE=$( if [ $( echo "$(printf "%039s\n" "$( echo "obase=10;ibase=16; $( echo -n "${KEY_NAME}" | MD5 | tr a-z A-Z )" | bc )") >= ${POINT_SEPARATE}" | bc ) -eq 1 ]; then echo 'GreaterEqual'; else echo 'LessThan'; fi ) && echo ${RESULT_COMPARE}
result
LessThan

Linux系の場合

command
KEY_NAME='Strawberry'
RESULT_COMPARE=$( if [ $( echo "$(printf "%039s\n" "$( echo "obase=10;ibase=16; $( echo -n "${KEY_NAME}" | md5sum | cut -c1-32 | tr a-z A-Z )" | bc )") >= ${POINT_SEPARATE}" | bc ) -eq 1 ]; then echo 'GreaterEqual'; else echo 'LessThan'; fi ) && echo ${RESULT_COMPARE}
result
LessThan

"Strawberry"はshardId-1のパティションキーとして使えることを確認できました。

パーティションキーの準備 (shardId-2用)

同様に、"Apple" の MD5 ハッシュ値を調べ、shardId-2 のパーティションキーとして使えるか確認します。"GreaterEqual" という結果が返ってくればOKです。

Mac OS X, BSD系の場合

command
KEY_NAME='Apple'
RESULT_COMPARE=$( if [ $( echo "$(printf "%039s\n" "$( echo "obase=10;ibase=16; $( echo -n "${KEY_NAME}" | MD5 | tr a-z A-Z )" | bc )") >= ${POINT_SEPARATE}" | bc ) -eq 1 ]; then echo 'GreaterEqual'; else echo 'LessThan'; fi ) && echo ${RESULT_COMPARE}
result
GreaterEqual

Linux系の場合

command
KEY_NAME='Apple'
RESULT_COMPARE=$( if [ $( echo "$(printf "%039s\n" "$( echo "obase=10;ibase=16; $( echo -n "${KEY_NAME}" | md5sum | cut -c1-32 | tr a-z A-Z )" | bc )") >= ${POINT_SEPARATE}" | bc ) -eq 1 ]; then echo 'GreaterEqual'; else echo 'LessThan'; fi ) && echo ${RESULT_COMPARE}
result
GreaterEqual

"Apple" は shardId-2 のパティションキーとして使えることを確認できました。

1.2. データレコードファイル名の決定

保存するJSONファイルの名前を決めます。

command
DATAFILE=myHandsOndata.json

1.3. 複数のデータが記述されたJSONファイルの作成

 パーティションキー、"Strawberry" "Apple" を用いて、複数のデータが記述されたJSONファイルを作成します。注意点としては、put-records で送信するデータは、4文字, 8文字, 12文字・・と4の倍数になっている必要があります。今回は"DATA0001"のように8文字にしています。

変数の確認
cat << ETX

DATAFILE : ${DATAFILE}

ETX
command
cat << EOF > ${DATAFILE}
{
  "StreamName": "handson",
  "Records": [
    {
      "Data": "DATA0000",
      "PartitionKey": "Strawberry"
    },
    {
      "Data": "DATA0001",
      "PartitionKey": "Strawberry"
    },
    {
      "Data": "DATA0002",
      "PartitionKey": "Strawberry"
    },
    {
      "Data": "DATA0003",
      "PartitionKey": "Apple"
    },
    {
      "Data": "DATA0004",
      "PartitionKey": "Apple"
    },
    {
      "Data": "DATA0005",
      "PartitionKey": "Apple"
    },
    {
      "Data": "DATA0006",
      "PartitionKey": "Strawberry"
    },
    {
      "Data": "DATA0007",
      "PartitionKey": "Strawberry"
    }
  ]
}
EOF

1.4. 作成したJSONファイルの確認

command
cat ${DATAFILE} | json_verify
result
JSON is valid

2. JSONファイルを用いた複数データレコードの一括入力

2.1. 複数レコードの一括送信

作成したjsonファイルの内容を kinesis に putします。

command
aws kinesis put-records --cli-input-json file://${DATAFILE}
result
{
    "FailedRecordCount": 0, 
    "Records": [
        {
            "ShardId": "shardId-000000000001", 
            "SequenceNumber": "49550864601487412533413396725901470615300484160378896402"
        }, 
        {
            "ShardId": "shardId-000000000001", 
            "SequenceNumber": "49550864601487412533413396725902679541120098789553602578"
        }, 
        {
            "ShardId": "shardId-000000000001", 
            "SequenceNumber": "49550864601487412533413396725903888466939713418728308754"
        }, 
        {
            "ShardId": "shardId-000000000002", 
            "SequenceNumber": "49550864601509713278611927349049050962671205667758407714"
        }, 
        {
            "ShardId": "shardId-000000000002", 
            "SequenceNumber": "49550864601509713278611927349050259888490820296933113890"
        }, 
        {
            "ShardId": "shardId-000000000002", 
            "SequenceNumber": "49550864601509713278611927349051468814310434926107820066"
        }, 
        {
            "ShardId": "shardId-000000000001", 
            "SequenceNumber": "49550864601487412533413396725905097392759328047903014930"
        }, 
        {
            "ShardId": "shardId-000000000001", 
            "SequenceNumber": "49550864601487412533413396725906306318578942677077721106"
        }
    ]
}

一度に8つのレコードが put され、"Strawberry" のパーティションキーが shardId-1 に、"Apple"のパーティションキー が shardId-2, に割り振られたことを確認できました。

3. データレコードの取得 (shardId-1)

分割して生成された shardId-1に対してレコードを取得します。

3.1 shardId と イテレータタイプの決定

shardId を決定します。

command
SHARD_ID=shardId-000000000001

イテレータタイプを指定します。ここでは、最も古いデータから取得可能な "TRIM_HORIZON" を指定します。

command
ITERATOR_TYPE="TRIM_HORIZON"

3.2 シャードイテレータ の取得

command
cat << ETX

stream-name         : ${STREAM_NAME}
shard-id            : ${SHARD_ID}
shard-iterator-type : ${ITERATOR_TYPE} 

ETX
command
ITERATOR=`aws kinesis get-shard-iterator --shard-id ${SHARD_ID} --shard-iterator-type ${ITERATOR_TYPE} --stream-name ${STREAM_NAME} | jq -r ".ShardIterator"` && echo "${ITERATOR}"
result(例)
AAAAAAAAAAGM4T3K0RNzgB8W3wUIRGay3UZVOgt3eLiulSpzvjKetPsAWCeV9grwIOz7o10Pk5opBihVfMjg7lWIybRbU11p+IEqFtXVuaDUqnDaOW4MOqCh3yyN8B+Nha+GPdIws2yl8v4iQZhbg7YsqsyGm85xGJxRys69+pvCm8rcUv7n6vDan5aq5MJLGpalal+HPkE=

3.3 データレコードの取得

 パーティションキーが "Strawberry" のデータレコードのみが取得できていることを確認します

command
aws kinesis get-records --shard-iterator ${ITERATOR}
result
{
    "Records": [
        {
            "PartitionKey": "Strawberry", 
            "Data": "REFUQTAwMDA=", 
            "SequenceNumber": "49550864601487412533413396725901470615300484160378896402"
        }, 
        {
            "PartitionKey": "Strawberry", 
            "Data": "REFUQTAwMDE=", 
            "SequenceNumber": "49550864601487412533413396725902679541120098789553602578"
        }, 
        {
            "PartitionKey": "Strawberry", 
            "Data": "REFUQTAwMDI=", 
            "SequenceNumber": "49550864601487412533413396725903888466939713418728308754"
        }, 
        {
            "PartitionKey": "Strawberry", 
            "Data": "REFUQTAwMDY=", 
            "SequenceNumber": "49550864601487412533413396725905097392759328047903014930"
        }, 
        {
            "PartitionKey": "Strawberry", 
            "Data": "REFUQTAwMDc=", 
            "SequenceNumber": "49550864601487412533413396725906306318578942677077721106"
        }
    ], 
    "NextShardIterator": "AAAAAAAAAAGOXSV7p3FA1rVz5cntdo4h55ViRLUwVdnSQSR6j9XIm7A8phgcH5PusUTneZDn+EuRWG2dj3iZZiOKYV0CHyEYBusLTsKP6kQx9oPInq6MJtHHJZu44kRQQ25+/CjHliipneu7PRpkXnl3xlgncjE/SnAG5W4QOjNSmg76BEYw2PR29tUJCmbUC7aQ3p8t2fU="
}

4. データレコードの取得 (shardId-2)(時間がない場合は飛ばしてOK)

shardId-2 に対しても同様にデータレコードを取得します。

4.1. shardId の決定

command
SHARD_ID=shardId-000000000002

4.2. イテレータの取得

command
cat << ETX

stream-name         : ${STREAM_NAME}
shard-id            : ${SHARD_ID}
shard-iterator-type : ${ITERATOR_TYPE} 

ETX
command
ITERATOR=`aws kinesis get-shard-iterator --shard-id ${SHARD_ID} --shard-iterator-type ${ITERATOR_TYPE} --stream-name ${STREAM_NAME} | jq -r ".ShardIterator"` && echo "${ITERATOR}"
result(例)
AAAAAAAAAAGn1dlBpY4iRB+GD/V/IcDulmRIZSm6fcvyW9yefWYiwi2a0HUSNIX7H1C8+D4+eyBkvcpyP/xn79nnvH1fnthChlRfwWSx1RhEmulNZV69xRPBBwo5JITosG501PzY4eaI4ZdXolgRtng5cAFgCM1BCCq30265JoDQK3qiGKQErRWBm6Csvs2lfjlbwpOHAnA=

4.3. データレコードの取得

shardId-2では、次の作業の準備も兼ねて、取得した結果をファイルに保存します。

ファイル名をきめます。

commnad
TMP_RECORDS=getRecords.json
command
aws kinesis get-records --shard-iterator ${ITERATOR} > ${TMP_RECORDS} && cat ${TMP_RECORDS}
result
{
    "Records": [
        {
            "PartitionKey": "Apple", 
            "Data": "REFUQTAwMDM=", 
            "SequenceNumber": "49550864601509713278611927349049050962671205667758407714"
        }, 
        {
            "PartitionKey": "Apple", 
            "Data": "REFUQTAwMDQ=", 
            "SequenceNumber": "49550864601509713278611927349050259888490820296933113890"
        }, 
        {
            "PartitionKey": "Apple", 
            "Data": "REFUQTAwMDU=", 
            "SequenceNumber": "49550864601509713278611927349051468814310434926107820066"
        }
    ], 
    "NextShardIterator": "AAAAAAAAAAH6lWTnGgU6srPK2qQOz0I6qwQk6pTHIM2+wYh5FKjCuvaH82So50jBWgKO3VAJ+s8pArwMe3Bp+TmFcZ8UTV4gYzoTez8wi4m2Z0Y6j6d+0m3GLwFFM25hfyZ2gfClLO9wDrJkuMOMKMZv5EHT7u3uYpxu9nc6gKHKBwBrIlfOqtE61xnUmM0Th0oUaBWOWeM="
}

5. 指定したシーケンス番号からのレコードの取得(時間がない場合は飛ばしてOK)

shardId-2を使って、指定したシーケンス番号からデータを取得してみます

5.1. イテレータタイプの決定

command
ITER_TYPE="AT_SEQUENCE_NUMBER"

5.2. シーケンス番号の取得

今回は先ほど保存したファイルからシーケンス番号を読み取ります。
下から2番目のシーケンスナンバーを取得します

command
SEQ_NUM=`cat ${TMP_RECORDS} | jq -r ".Records[] | .SequenceNumber" | tail -2 | head -1` && echo ${SEQ_NUM}
result(例)
49550864601509713278611927349050259888490820296933113890

5.3 イテレータの取得

変数の確認
cat << ETX

SHARD_ID    : ${SHARD_ID}
ITER_TYPE   : ${ITER_TYPE}
SEQ_NUM     : ${SEQ_NUM}
STREAM_NAME : ${STREAM_NAME}

ETX
command
ITERATOR=`aws kinesis get-shard-iterator --shard-id ${SHARD_ID} --shard-iterator-type ${ITER_TYPE} --starting-sequence-number ${SEQ_NUM} --stream-name ${STREAM_NAME} --query 'ShardIterator'` && echo "${ITERATOR}"
result
"AAAAAAAAAAE/ywKQ9bQnwdXqRt97c/rmbkgBe/RNV2qb24SpeizpGIX4tGb5Rh5kZF64o8b5VOkX/GuhZjPiJckn/fpZU9nBLD4/lOhQCc3aK4U6SI4qN8SETRaUgazCoJ7BjgU00G/YkqOr+X45yVCOv3ygUnxA4K1DIBPycQ4W8I9PDMnZ6Q/1kPSHa/cHwys1eoDEbx4="

5.4 レコードの取得

レコードを取得します。2つのレコードが取得できていることを確認します

command
aws kinesis get-records --shard-iterator ${ITERATOR}
result
{
    "Records": [
        {
            "PartitionKey": "Apple", 
            "Data": "REFUQTAwMDQ=", 
            "SequenceNumber": "49550864601509713278611927349050259888490820296933113890"
        }, 
        {
            "PartitionKey": "Apple", 
            "Data": "REFUQTAwMDU=", 
            "SequenceNumber": "49550864601509713278611927349051468814310434926107820066"
        }
    ], 
    "NextShardIterator": "AAAAAAAAAAHAgvtX6HEG2zLb1ykCtNVkkGnr85ZV1F6TFQt41zkPmXVNl3Cyw0/qZlUmp5DTKp/HWuHOW3YETP3SSTMk0iGUkUE9tiNeVDyvE/6cjFd/jIJQJMkSbUeZfgv80KAEz1Gbr4dNixAvTfu01F/ML2f37gQaGUIdpHfy23jDnDP/4Uf3LcDxG0wrt74nWBWBfK0="
}

完了

分割した shard に対して 複数のレコードを入力し、パーティションキーに応じて、自動的に レコードが分割されることを確認できました。また、指定したシーケンス番号からのデータレコード読み込みも確認できました。次に、分割した shard をマージします。
http://qiita.com/daikumatan/items/7c36aaa124260e521c93

Why do not you register as a user and use Qiita more conveniently?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away