LoginSignup
6

More than 5 years have passed since last update.

PythonでKinesis操作する

Posted at

Kinesisストリームとは

大量のデータレコードをリアルタイムで収集し、処理する。
1時間当たり何テラにもなるデータを連続的に取得し保存できる。
データの保存期間は24時間。課金することで7日まで延ばすことができる。

シャード

ストリームのスループットの単位。
シャードを増やすことでそのストリームのスループットを増やすことができる。

データレコード

ストリームに保存されるデータの単位。
レコードは、シーケンス番号、パーティションキー、データBLOBで構成されている。
データBLOBとはデータそのもの。Base64でエンコードされて保存される。

Kinesisの制限

  • 1レコードの容量は1MB

読み取り

どちらかにひっかかるとエラーなります。

  • 1シャードにつき2MB/s
  • 1シャードにつき5トランザクション

書き込み

どちらかにひっかかるとエラーなります。

  • 1シャードにつき1MB/s
  • 1シャードにつき1000トランザクション

PythonでKinesisからget/putする

準備

pipでbotoをインストールする。
AWSでアクセスキーとシークレットキーを取得しておく。
プログラムと同じ階層に.botoとして以下のようなファイルを作成する。

.boto
[Credentials]
aws_access_key_id = YOUR_ACCESS_KEY_ID
aws_secret_access_key = YOUR_SECRET_ACCESS_KEY

getする

from boto impor kinesis

# 使用するKinesisのリージョンを指定する
conn = kinesis.connect_to_region(region_name = 'リージョン名')

# 指定したstreamの情報を取得する
stream = conn.describe_stream('ストリーム名')

# さっきとってきたストリームの情報の中からシャード情報を取得する
# シャードが複数ある場合は、リストで返る
shards = stream['StreamDescription']['Shards']

# 全てのシャードから取得する
for shard in shards:
  shard_id = shard['ShardId']

  # 取得する位置を得る
  # 第1引数:ストリーム名
  # 第2引数:シャードid
  # 第3引数:読み取りタイプ
  #         LATEST:シャードの一番最後から読み取る
  #         TRIM_HORIZON:シャードの一番最初から読み取る
  #         AT_SEQUENCE_NUMBER:指定したシーケンス番号から読み取る。第4引数でシーケンス番号を指定
  #         AFTER_SEQUENCE_NUMBER:指定したシーケンス番号の次から読み取る。第4引数でシーケンス番号を指定
  iterator = conn.get_sahrd_iterator(stream['StreamDescription']['StreamName'], shard_id, shard_iterator_type='LATEST')['ShardIterator']

  # Kinesisからデータを取得
  # 第2引数で取得上限指定可能(limit)
  result = conn.get_records(iterator)
  print result

streamをdescribeしたやつをprintするとわかりやすいと思います。

putする

from boto import kinesis


# 使用するKinesisのリージョンを指定する
conn = kinesis.connect_to_region(region_name = 'リージョン名')

# putする
conn.put_record('ストリーム名', data = 'hogehoge', partition_key = 'partition_key')

partition_key

パーティションキーは、データレコードを分離してストリームの異なるシャードにルーティングするために使用される。
ストリームに2つのシャードがある場合、2つのパティションキーを使用し、どのシャードにどのデータを格納するか制御することができる。

参考

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
6