Kinesisストリームとは
大量のデータレコードをリアルタイムで収集し、処理する。
1時間当たり何テラにもなるデータを連続的に取得し保存できる。
データの保存期間は24時間。課金することで7日まで延ばすことができる。
シャード
ストリームのスループットの単位。
シャードを増やすことでそのストリームのスループットを増やすことができる。
データレコード
ストリームに保存されるデータの単位。
レコードは、シーケンス番号、パーティションキー、データBLOBで構成されている。
データBLOBとはデータそのもの。Base64でエンコードされて保存される。
Kinesisの制限
- 1レコードの容量は1MB
読み取り
どちらかにひっかかるとエラーなります。
- 1シャードにつき2MB/s
- 1シャードにつき5トランザクション
書き込み
どちらかにひっかかるとエラーなります。
- 1シャードにつき1MB/s
- 1シャードにつき1000トランザクション
PythonでKinesisからget/putする
準備
pipでbotoをインストールする。
AWSでアクセスキーとシークレットキーを取得しておく。
プログラムと同じ階層に.boto
として以下のようなファイルを作成する。
.boto
[Credentials]
aws_access_key_id = YOUR_ACCESS_KEY_ID
aws_secret_access_key = YOUR_SECRET_ACCESS_KEY
getする
from boto impor kinesis
# 使用するKinesisのリージョンを指定する
conn = kinesis.connect_to_region(region_name = 'リージョン名')
# 指定したstreamの情報を取得する
stream = conn.describe_stream('ストリーム名')
# さっきとってきたストリームの情報の中からシャード情報を取得する
# シャードが複数ある場合は、リストで返る
shards = stream['StreamDescription']['Shards']
# 全てのシャードから取得する
for shard in shards:
shard_id = shard['ShardId']
# 取得する位置を得る
# 第1引数:ストリーム名
# 第2引数:シャードid
# 第3引数:読み取りタイプ
# LATEST:シャードの一番最後から読み取る
# TRIM_HORIZON:シャードの一番最初から読み取る
# AT_SEQUENCE_NUMBER:指定したシーケンス番号から読み取る。第4引数でシーケンス番号を指定
# AFTER_SEQUENCE_NUMBER:指定したシーケンス番号の次から読み取る。第4引数でシーケンス番号を指定
iterator = conn.get_sahrd_iterator(stream['StreamDescription']['StreamName'], shard_id, shard_iterator_type='LATEST')['ShardIterator']
# Kinesisからデータを取得
# 第2引数で取得上限指定可能(limit)
result = conn.get_records(iterator)
print result
streamをdescribeしたやつをprintするとわかりやすいと思います。
putする
from boto import kinesis
# 使用するKinesisのリージョンを指定する
conn = kinesis.connect_to_region(region_name = 'リージョン名')
# putする
conn.put_record('ストリーム名', data = 'hogehoge', partition_key = 'partition_key')
partition_key
パーティションキーは、データレコードを分離してストリームの異なるシャードにルーティングするために使用される。
ストリームに2つのシャードがある場合、2つのパティションキーを使用し、どのシャードにどのデータを格納するか制御することができる。