複数データレコードが記述されたJSONファイル作成し、一括でデータを put します。個別にデータレコードを送信するときに比べ、オーバーヘッドを減らし、効率的にデータを送信することができます。リアルタイムにデータを可視化したい場合などに適しています。
注意事項
Amazon Kinesis に無料枠はありません。今回のハンズオンでは2つの shard を 2時間弱使用するため、利用料金として10円程度発生しますのでご注意ください。ハンズオン後はストリームの削除をお忘れなくお願いします。
Kinesis の料金 (2015年5月4日現在)
http://aws.amazon.com/jp/kinesis/pricing/
- $0.0195 /h (1shardあたり)
- $0.0430 /1,000,000 PutRecords
前提条件
Kinesis への権限
- Kinesis に対してフル権限があること。
AWS CLI のバージョン
- 以下のバージョンで動作確認済
- AWS CLI 1.7.24
aws --version
aws-cli/1.7.25 Python/2.7.6 Darwin/14.3.0
- 準備
===============================
0.1. リージョンの決定
作成するストリームのリージョンを決めます。
(カレントユーザが利用するカレントリージョンも切り変わります。)
export AWS_DEFAULT_REGION='ap-northeast-1'
0.2. 変数の確認
プロファイルとリージョンが想定のものになっていることを確認します。
aws configure list
Name Value Type Location
---- ----- ---- --------
profile KinesisFull-handsOn-Kinesis-user-5KVYHRGZMYWG env AWS_DEFAULT_PROFILE
access_key ****************HORA shared-credentials-file
secret_key ****************F9zU shared-credentials-file
region ap-northeast-1 env AWS_DEFAULT_REGION
0.3. ストリームの指定
aws kinesis list-streams
STREAM_NAME='<ストリーム名>'
0.4. ストリームの確認
aws kinesis describe-stream --stream-name ${STREAM_NAME}
{
"StreamDescription": {
"StreamStatus": "ACTIVE",
"StreamName": "handson",
"StreamARN": "arn:aws:kinesis:ap-northeast-1:XXXXXXXXXXXX:stream/handson",
"Shards": [
{
"ShardId": "shardId-000000000000",
"HashKeyRange": {
"EndingHashKey": "340282366920938463463374607431768211455",
"StartingHashKey": "0"
},
"SequenceNumberRange": {
"EndingSequenceNumber": "49550274379963104000400348362601688132548853773472104450",
"StartingSequenceNumber": "49550274379951953627801083051032129199232032484024647682"
}
},
{
"ShardId": "shardId-000000000001",
"HashKeyRange": {
"EndingHashKey": "170141183460469231731687303715884105727",
"StartingHashKey": "0"
},
"ParentShardId": "shardId-000000000000",
"SequenceNumberRange": {
"StartingSequenceNumber": "49550274960507253381148795294631482991086939375801466898"
}
},
{
"ShardId": "shardId-000000000002",
"HashKeyRange": {
"EndingHashKey": "340282366920938463463374607431768211455",
"StartingHashKey": "170141183460469231731687303715884105728"
},
"ParentShardId": "shardId-000000000000",
"SequenceNumberRange": {
"StartingSequenceNumber": "49550274960529554126347325917773018709359587737307447330"
}
}
]
}
}
- 事前準備
===================
複数のデータレコードが記述されたJSONファイルを作成します。1つのレコードに対して、少なくともデータとパーティションキーをセットに送信します。
1.1. パーティションキーの設計
実際の利用時は、どの shardId も負荷が均等になるように __「パーティションキーのハッシュ値はランダムで付与するほうがベター」__です。しかし今回は、Kinesis の動作の理解のため、自らパーティションキーを設計し、データが期待通りの shardId に振り分けられることを確認します。
前セクションで shardId-0 を半分で分割したため、shardId-1, shardId-2 は、ハッシュ値 2^128/2 (=2^127) で分割されています。
最初に、shardId-1とshardId-2を分割ポイントを明示します
POINT_SEPARATE='2^127'
次に、分割した2つの shard にデータレコードをうまく振り分けられるようなパーティションキーを見つけます。
パーティションキーの準備 (shardId-1用)
"Strawberry" の MD5 ハッシュ値が、shardId-1 のパーティションキーとして使えるか確認します。
下記コマンドは、次のように動作します。
- shardId-1用
- ハッシュ値 < ${POINT_SEPARATE}
- 結果: "LessThan" を表示
- shardId-2用
- ハッシュ値 > ${POINT_SEPARATE}
- 結果: "GreaterEqual" を表示
つまり shardId-1 用では "LessThan" と表示されればOKです!
Mac OS X, BSD系の場合
KEY_NAME='Strawberry'
RESULT_COMPARE=$( if [ $( echo "$(printf "%039s\n" "$( echo "obase=10;ibase=16; $( echo -n "${KEY_NAME}" | MD5 | tr a-z A-Z )" | bc )") >= ${POINT_SEPARATE}" | bc ) -eq 1 ]; then echo 'GreaterEqual'; else echo 'LessThan'; fi ) && echo ${RESULT_COMPARE}
LessThan
Linux系の場合
KEY_NAME='Strawberry'
RESULT_COMPARE=$( if [ $( echo "$(printf "%039s\n" "$( echo "obase=10;ibase=16; $( echo -n "${KEY_NAME}" | md5sum | cut -c1-32 | tr a-z A-Z )" | bc )") >= ${POINT_SEPARATE}" | bc ) -eq 1 ]; then echo 'GreaterEqual'; else echo 'LessThan'; fi ) && echo ${RESULT_COMPARE}
LessThan
"Strawberry"はshardId-1のパティションキーとして使えることを確認できました。
パーティションキーの準備 (shardId-2用)
同様に、"Apple" の MD5 ハッシュ値を調べ、shardId-2 のパーティションキーとして使えるか確認します。"GreaterEqual" という結果が返ってくればOKです。
Mac OS X, BSD系の場合
KEY_NAME='Apple'
RESULT_COMPARE=$( if [ $( echo "$(printf "%039s\n" "$( echo "obase=10;ibase=16; $( echo -n "${KEY_NAME}" | MD5 | tr a-z A-Z )" | bc )") >= ${POINT_SEPARATE}" | bc ) -eq 1 ]; then echo 'GreaterEqual'; else echo 'LessThan'; fi ) && echo ${RESULT_COMPARE}
GreaterEqual
Linux系の場合
KEY_NAME='Apple'
RESULT_COMPARE=$( if [ $( echo "$(printf "%039s\n" "$( echo "obase=10;ibase=16; $( echo -n "${KEY_NAME}" | md5sum | cut -c1-32 | tr a-z A-Z )" | bc )") >= ${POINT_SEPARATE}" | bc ) -eq 1 ]; then echo 'GreaterEqual'; else echo 'LessThan'; fi ) && echo ${RESULT_COMPARE}
GreaterEqual
"Apple" は shardId-2 のパティションキーとして使えることを確認できました。
1.2. データレコードファイル名の決定
保存するJSONファイルの名前を決めます。
DATAFILE=myHandsOndata.json
1.3. 複数のデータが記述されたJSONファイルの作成
パーティションキー、"Strawberry" "Apple" を用いて、複数のデータが記述されたJSONファイルを作成します。注意点としては、put-records
で送信するデータは、4文字, 8文字, 12文字・・と4の倍数になっている必要があります。今回は"DATA0001"のように8文字にしています。
cat << ETX
DATAFILE : ${DATAFILE}
ETX
cat << EOF > ${DATAFILE}
{
"StreamName": "handson",
"Records": [
{
"Data": "DATA0000",
"PartitionKey": "Strawberry"
},
{
"Data": "DATA0001",
"PartitionKey": "Strawberry"
},
{
"Data": "DATA0002",
"PartitionKey": "Strawberry"
},
{
"Data": "DATA0003",
"PartitionKey": "Apple"
},
{
"Data": "DATA0004",
"PartitionKey": "Apple"
},
{
"Data": "DATA0005",
"PartitionKey": "Apple"
},
{
"Data": "DATA0006",
"PartitionKey": "Strawberry"
},
{
"Data": "DATA0007",
"PartitionKey": "Strawberry"
}
]
}
EOF
1.4. 作成したJSONファイルの確認
cat ${DATAFILE} | json_verify
JSON is valid
- JSONファイルを用いた複数データレコードの一括入力
=====================================
2.1. 複数レコードの一括送信
作成したjsonファイルの内容を kinesis に putします。
aws kinesis put-records --cli-input-json file://${DATAFILE}
{
"FailedRecordCount": 0,
"Records": [
{
"ShardId": "shardId-000000000001",
"SequenceNumber": "49550864601487412533413396725901470615300484160378896402"
},
{
"ShardId": "shardId-000000000001",
"SequenceNumber": "49550864601487412533413396725902679541120098789553602578"
},
{
"ShardId": "shardId-000000000001",
"SequenceNumber": "49550864601487412533413396725903888466939713418728308754"
},
{
"ShardId": "shardId-000000000002",
"SequenceNumber": "49550864601509713278611927349049050962671205667758407714"
},
{
"ShardId": "shardId-000000000002",
"SequenceNumber": "49550864601509713278611927349050259888490820296933113890"
},
{
"ShardId": "shardId-000000000002",
"SequenceNumber": "49550864601509713278611927349051468814310434926107820066"
},
{
"ShardId": "shardId-000000000001",
"SequenceNumber": "49550864601487412533413396725905097392759328047903014930"
},
{
"ShardId": "shardId-000000000001",
"SequenceNumber": "49550864601487412533413396725906306318578942677077721106"
}
]
}
一度に8つのレコードが put され、"Strawberry" のパーティションキーが shardId-1 に、"Apple"のパーティションキー が shardId-2, に割り振られたことを確認できました。
- データレコードの取得 (shardId-1)
====================================
分割して生成された shardId-1に対してレコードを取得します。
3.1 shardId と イテレータタイプの決定
shardId を決定します。
SHARD_ID=shardId-000000000001
イテレータタイプを指定します。ここでは、最も古いデータから取得可能な "TRIM_HORIZON" を指定します。
ITERATOR_TYPE="TRIM_HORIZON"
3.2 シャードイテレータ の取得
cat << ETX
stream-name : ${STREAM_NAME}
shard-id : ${SHARD_ID}
shard-iterator-type : ${ITERATOR_TYPE}
ETX
ITERATOR=`aws kinesis get-shard-iterator --shard-id ${SHARD_ID} --shard-iterator-type ${ITERATOR_TYPE} --stream-name ${STREAM_NAME} | jq -r ".ShardIterator"` && echo "${ITERATOR}"
AAAAAAAAAAGM4T3K0RNzgB8W3wUIRGay3UZVOgt3eLiulSpzvjKetPsAWCeV9grwIOz7o10Pk5opBihVfMjg7lWIybRbU11p+IEqFtXVuaDUqnDaOW4MOqCh3yyN8B+Nha+GPdIws2yl8v4iQZhbg7YsqsyGm85xGJxRys69+pvCm8rcUv7n6vDan5aq5MJLGpalal+HPkE=
3.3 データレコードの取得
パーティションキーが "Strawberry" のデータレコードのみが取得できていることを確認します
aws kinesis get-records --shard-iterator ${ITERATOR}
{
"Records": [
{
"PartitionKey": "Strawberry",
"Data": "REFUQTAwMDA=",
"SequenceNumber": "49550864601487412533413396725901470615300484160378896402"
},
{
"PartitionKey": "Strawberry",
"Data": "REFUQTAwMDE=",
"SequenceNumber": "49550864601487412533413396725902679541120098789553602578"
},
{
"PartitionKey": "Strawberry",
"Data": "REFUQTAwMDI=",
"SequenceNumber": "49550864601487412533413396725903888466939713418728308754"
},
{
"PartitionKey": "Strawberry",
"Data": "REFUQTAwMDY=",
"SequenceNumber": "49550864601487412533413396725905097392759328047903014930"
},
{
"PartitionKey": "Strawberry",
"Data": "REFUQTAwMDc=",
"SequenceNumber": "49550864601487412533413396725906306318578942677077721106"
}
],
"NextShardIterator": "AAAAAAAAAAGOXSV7p3FA1rVz5cntdo4h55ViRLUwVdnSQSR6j9XIm7A8phgcH5PusUTneZDn+EuRWG2dj3iZZiOKYV0CHyEYBusLTsKP6kQx9oPInq6MJtHHJZu44kRQQ25+/CjHliipneu7PRpkXnl3xlgncjE/SnAG5W4QOjNSmg76BEYw2PR29tUJCmbUC7aQ3p8t2fU="
}
- データレコードの取得 (shardId-2)(時間がない場合は飛ばしてOK)
==========================================
shardId-2 に対しても同様にデータレコードを取得します。
4.1. shardId の決定
SHARD_ID=shardId-000000000002
4.2. イテレータの取得
cat << ETX
stream-name : ${STREAM_NAME}
shard-id : ${SHARD_ID}
shard-iterator-type : ${ITERATOR_TYPE}
ETX
ITERATOR=`aws kinesis get-shard-iterator --shard-id ${SHARD_ID} --shard-iterator-type ${ITERATOR_TYPE} --stream-name ${STREAM_NAME} | jq -r ".ShardIterator"` && echo "${ITERATOR}"
AAAAAAAAAAGn1dlBpY4iRB+GD/V/IcDulmRIZSm6fcvyW9yefWYiwi2a0HUSNIX7H1C8+D4+eyBkvcpyP/xn79nnvH1fnthChlRfwWSx1RhEmulNZV69xRPBBwo5JITosG501PzY4eaI4ZdXolgRtng5cAFgCM1BCCq30265JoDQK3qiGKQErRWBm6Csvs2lfjlbwpOHAnA=
4.3. データレコードの取得
shardId-2では、次の作業の準備も兼ねて、取得した結果をファイルに保存します。
ファイル名をきめます。
TMP_RECORDS=getRecords.json
aws kinesis get-records --shard-iterator ${ITERATOR} > ${TMP_RECORDS} && cat ${TMP_RECORDS}
{
"Records": [
{
"PartitionKey": "Apple",
"Data": "REFUQTAwMDM=",
"SequenceNumber": "49550864601509713278611927349049050962671205667758407714"
},
{
"PartitionKey": "Apple",
"Data": "REFUQTAwMDQ=",
"SequenceNumber": "49550864601509713278611927349050259888490820296933113890"
},
{
"PartitionKey": "Apple",
"Data": "REFUQTAwMDU=",
"SequenceNumber": "49550864601509713278611927349051468814310434926107820066"
}
],
"NextShardIterator": "AAAAAAAAAAH6lWTnGgU6srPK2qQOz0I6qwQk6pTHIM2+wYh5FKjCuvaH82So50jBWgKO3VAJ+s8pArwMe3Bp+TmFcZ8UTV4gYzoTez8wi4m2Z0Y6j6d+0m3GLwFFM25hfyZ2gfClLO9wDrJkuMOMKMZv5EHT7u3uYpxu9nc6gKHKBwBrIlfOqtE61xnUmM0Th0oUaBWOWeM="
}
- 指定したシーケンス番号からのレコードの取得(時間がない場合は飛ばしてOK)
======================================
shardId-2を使って、指定したシーケンス番号からデータを取得してみます
5.1. イテレータタイプの決定
ITER_TYPE="AT_SEQUENCE_NUMBER"
5.2. シーケンス番号の取得
今回は先ほど保存したファイルからシーケンス番号を読み取ります。
下から2番目のシーケンスナンバーを取得します
SEQ_NUM=`cat ${TMP_RECORDS} | jq -r ".Records[] | .SequenceNumber" | tail -2 | head -1` && echo ${SEQ_NUM}
49550864601509713278611927349050259888490820296933113890
5.3 イテレータの取得
cat << ETX
SHARD_ID : ${SHARD_ID}
ITER_TYPE : ${ITER_TYPE}
SEQ_NUM : ${SEQ_NUM}
STREAM_NAME : ${STREAM_NAME}
ETX
ITERATOR=`aws kinesis get-shard-iterator --shard-id ${SHARD_ID} --shard-iterator-type ${ITER_TYPE} --starting-sequence-number ${SEQ_NUM} --stream-name ${STREAM_NAME} --query 'ShardIterator'` && echo "${ITERATOR}"
"AAAAAAAAAAE/ywKQ9bQnwdXqRt97c/rmbkgBe/RNV2qb24SpeizpGIX4tGb5Rh5kZF64o8b5VOkX/GuhZjPiJckn/fpZU9nBLD4/lOhQCc3aK4U6SI4qN8SETRaUgazCoJ7BjgU00G/YkqOr+X45yVCOv3ygUnxA4K1DIBPycQ4W8I9PDMnZ6Q/1kPSHa/cHwys1eoDEbx4="
5.4 レコードの取得
レコードを取得します。2つのレコードが取得できていることを確認します
aws kinesis get-records --shard-iterator ${ITERATOR}
{
"Records": [
{
"PartitionKey": "Apple",
"Data": "REFUQTAwMDQ=",
"SequenceNumber": "49550864601509713278611927349050259888490820296933113890"
},
{
"PartitionKey": "Apple",
"Data": "REFUQTAwMDU=",
"SequenceNumber": "49550864601509713278611927349051468814310434926107820066"
}
],
"NextShardIterator": "AAAAAAAAAAHAgvtX6HEG2zLb1ykCtNVkkGnr85ZV1F6TFQt41zkPmXVNl3Cyw0/qZlUmp5DTKp/HWuHOW3YETP3SSTMk0iGUkUE9tiNeVDyvE/6cjFd/jIJQJMkSbUeZfgv80KAEz1Gbr4dNixAvTfu01F/ML2f37gQaGUIdpHfy23jDnDP/4Uf3LcDxG0wrt74nWBWBfK0="
}
完了
分割した shard に対して 複数のレコードを入力し、パーティションキーに応じて、自動的に レコードが分割されることを確認できました。また、指定したシーケンス番号からのデータレコード読み込みも確認できました。次に、分割した shard をマージします。
http://qiita.com/daikumatan/items/7c36aaa124260e521c93