やること
- "hogehoge"をひたすらKinesisに投げる
- "hogehoge"が入っているか確認する
じゅんび
- Pythonを使えるようにする
- botoをインストールしておく
- IAMでKinesisをいじれるアカウントを作成しておく
- Kinesisにデータを受け取る口であるStreamとShardを作成する
ちなみに今回はshardの名前は「test」で数は1本としています。
かきこむ
Main.py
#-*- coding: utf-8 -*-
from boto import kinesis
auth = {"aws_access_key_id":"IAMのACCESS KEYを入れてください", "aws_secret_access_key":"IAMのSECRET ACCESS KEYを入れてください"}
if __name__ == '__main__':
# kinesis.connect_to_region('リージョン',IAM認証情報)
Connection = kinesis.connect_to_region('us-east-1',**auth)
while true:
# 書き込む
# Connection.put_record(ストリーム名,書き込むデータ, PartitionKey)
put_response = Connection.put_record('test' , "hogehoge" , 'one')
sleep(10)
これだけ。
みる
shardが増えても処理できるようにshardごとにworkerを取り付けてデータを読むようにしてます。
show.py
# -*- coding: utf-8 -*-
import time
import base64
import multiprocessing
from boto import kinesis
import threading
auth = {"aws_access_key_id":"IAMのACCESS KEYを入れてください", "aws_secret_access_key":"IAMのSECRET ACCESS KEYを入れてください"}
# ストリームの名前
STREAM_NAME='test'
def worker(connect, kinesis_iterator):
next_iterator = kinesis_iterator['ShardIterator']
while True:
response = connect.get_records(next_iterator)
next_iterator = response['NextShardIterator']
time.sleep(1)
# shardに書かれてる内容を表示する
for data in response['Records']:
print(data)
def get_record():
connect = kinesis.connect_to_region('us-east-1',**auth)
stream = connect.describe_stream(STREAM_NAME)
# shardのリストを取得する
shards = stream['StreamDescription']['Shards']
# shardごとにworkerを取り付けてデータを取得する
for shard in shards:
kinesis_iterator = connect.get_shard_iterator(STREAM_NAME, shard['ShardId'], shard_iterator_type='TRIM_HORIZON')
job = threading.Thread(target=worker, args=(connect, kinesis_iterator))
job.start()
if __name__ == '__main__':
get_record()
こんな感じで行けました。