注意事項
Amazon Kinesis に無料枠はありません。今回のハンズオンでは2つの shard を 2時間弱使用するため、利用料金として10円程度発生しますのでご注意ください。ハンズオン後はストリームの削除をお忘れなくお願いします。
Kinesis の料金 (2015年5月4日現在)
http://aws.amazon.com/jp/kinesis/pricing/
- $0.0195 /h (1shardあたり)
- $0.0430 /1,000,000 PutRecords
前提条件
Kinesis への権限
- Kinesis に対してフル権限があること。
AWS CLI のバージョン
- 以下のバージョンで動作確認済
- AWS CLI 1.7.24
aws --version
aws-cli/1.7.25 Python/2.7.6 Darwin/14.3.0
- 準備
===============================
0.1. リージョンの決定
作成するストリームのリージョンを決めます。
(カレントユーザが利用するカレントリージョンも切り変わります。)
export AWS_DEFAULT_REGION='ap-northeast-1'
0.2. 変数の確認
プロファイルとリージョンが想定のものになっていることを確認します。
aws configure list
Name Value Type Location
---- ----- ---- --------
profile KinesisFull-handsOn-Kinesis-user-5KVYHRGZMYWG env AWS_DEFAULT_PROFILE
access_key ****************HORA shared-credentials-file
secret_key ****************F9zU shared-credentials-file
region ap-northeast-1 env AWS_DEFAULT_REGION
0.3. ストリームの指定
aws kinesis list-streams
STREAM_NAME='<ストリーム名>'
0.4. ストリームの内容確認
aws kinesis describe-stream --stream-name ${STREAM_NAME}
{
"StreamDescription": {
"StreamStatus": "ACTIVE",
"StreamName": "handson",
"StreamARN": "arn:aws:kinesis:ap-northeast-1:XXXXXXXXXXXX:stream/handson",
"Shards": [
{
"ShardId": "shardId-000000000000",
"HashKeyRange": {
"EndingHashKey": "340282366920938463463374607431768211455",
"StartingHashKey": "0"
},
"SequenceNumberRange": {
"StartingSequenceNumber": "49550864092651309338540168505480986788282674130572017666"
}
}
]
}
}
- 事前準備
==================
1.1. 入力データの決定
Kinesis のストリーム に送信するデータを決めます。
MY_DATA="DATA0001"
1.2. パーティションキーの決定
上記データのパーティションキーを決めます。この文字列によって、どの shard に割り振られるかが決まります (現在は shard の数は1のため必ず同じ shard に割り当てられます)。
PARTITION_KEY="JAWS-UG"
- データレコードの入力
==========================
2.1. データレコードの入力
データレコード(データとパーティションキー)をストーリームに送信します。
cat << ETX
STREAM_NAME: ${STREAM_NAME}
MY_DATA: ${MY_DATA}
PARTITION_KEY: ${PARTITION_KEY}
ETX
aws kinesis put-record --stream-name ${STREAM_NAME} --partition-key ${PARTITION_KEY} --data ${MY_DATA}
{
"ShardId": "shardId-000000000000",
"SequenceNumber": "49550864092651309338540169253629565960074435278619541506"
}
シーケンスナンバーが付与され、shardId-0 へデータレコードが割り振られたことを確認できました。
- データレコードの取得
================================
2.1. shardId と イテレータタイプの決定
データを取得する shardId を決めます
SHARD_ID=shardId-000000000000
指定した Shard に対して、どの位置からデータを取得するかを決めるため、イテレータタイプを指定します。ここでは、最も古いデータから取得可能な "TRIM_HORIZON" を使います
ITERATOR_TYPE="TRIM_HORIZON"
2.2 シャードイテレータ の取得
cat << ETX
stream-name : ${STREAM_NAME}
shard-id : ${SHARD_ID}
shard-iterator-type : ${ITERATOR_TYPE}
ETX
ITERATOR=`aws kinesis get-shard-iterator --shard-id ${SHARD_ID} --shard-iterator-type ${ITERATOR_TYPE} --stream-name ${STREAM_NAME} | jq -r ".ShardIterator"` && echo "${ITERATOR}"
AAAAAAAAAAGHoIrHr76EenUaL/Ltu/hSFVJ5AMKwNYLqStmlmn+Edt1jhEx/+MvPhssy/s8eKZo2zwdw+is3PpuXtM3AbJhy1DAqDjPeUa5I3LmYuwq8eh+NxIeCi1KD7B4kl2h76FPCm0T7kdpwKbr1YIZifJMDNode9ZiQBNdicQKE/tdnFgy6eryHEIMQpQmCXExzEQQ=
2.3 データレコードの取得
aws kinesis get-records --shard-iterator ${ITERATOR}
{
"Records": [
{
"PartitionKey": "JAWS-UG",
"Data": "REFUQTAwMDE=",
"SequenceNumber": "49550864092651309338540169253629565960074435278619541506"
}
],
"NextShardIterator": "AAAAAAAAAAHSixRwTU8RavU+5EvyTyUgkfrGoVcHaRdQU4UkKfuZ1ZNVJWw7DEK2WqnbBJwmFV/uzCyLqumMx0DZl9j8ckYTnmx+AuGFv78wrtIstuKzy3tCcBpCG4SgsdrveWaFu311h9VVXgcAggK3QzZD17VdOkGl/dclsqo9/DXmf+0soIzxB8YjDyiT9Mj2CyQf85E="
}
"put-record" コマンドでは バイナリデータを送信できるように Base64 エンコーディングを使用しています。そのため "Data" の値は 文字化けのように見えます。下記のようにすることで、デコードすることができます(上記イテレータ取得から5分以内に実行してください)。
aws kinesis get-records --shard-iterator ${ITERATOR} | jq -r '.Records[] | "\(.Data)"' | openssl enc -d -base64
DATA0001
完了
次に shard の分割を行います。
http://qiita.com/daikumatan/items/9d423aaeb5b00f55645f