はじめに
kinesisに入っていくデータを覗き見たかった
kinesisにデータが入ると表示してくれる
kinesisへのフル権限があること
参考
kinesisを作ったりデータをいれたりするのは他を参考にしてください
Kinesisの雰囲気をつかむためにawscliで操作する
[JAWS-UG CLI] Kinesis:#2 データレコードの入力 / 取得
コード
kinesis_get.py
import sys
import time
import boto3
from datetime import datetime
client = boto3.client('kinesis')
def get_kinesis(key):
try:
res = client.get_records(
ShardIterator=key,
Limit=123
)
return res
except Exception as e:
print('Kinesis put record excption')
print(e)
sys.exit
def get_iterator(name, date, id):
t = datetime.strptime(date, '%Y/%m/%d')
res = client.get_shard_iterator(
StreamName=name,
ShardId=id,
ShardIteratorType='AT_TIMESTAMP',
Timestamp=t
)
return res
def main(stream, date, shard_id):
try:
res = get_iterator(stream, date ,shard_id)
itr = res['ShardIterator']
while True:
response = get_kinesis(itr)
for record in response['Records']:
print(record['Data'])
nxt = response['NextShardIterator']
time.sleep(2)
if nxt == None or nxt == '':
break
itr = nxt
except Exception as e:
print('Exception exit')
print(e)
if __name__ == '__main__':
if len(sys.argv) < 3:
error = "ERROR: Please enter stream name and shard id\n"
command = "{} python {} [stream name] [start date %Y/%m/%d] [shard id]".format(error, sys.argv[0])
print(command)
exit()
print('----------start-------------')
stream = sys.argv[1]
date = sys.argv[2]
if len(sys.argv) < 4:
shard_id = 'shardId-000000000000'
else:
shard_id = sys.argv[3]
main(stream, date, shard_id)
使いかた
必要なもの
pip install boto3
コマンド
日付の形式は[%Y/%m/%d]
必須なのはstream nameとstart date
python kinesis_get.py [stream name] [start date] [shard id]
###結果
これでkinesisにデータが挿入されると表示されていく
b'{"a": 1, "b": 2}'
b'{"a": 3, "b": 4}'
b'{"a": 5, "b": 6}'
b'{"a": 7, "b": 8}'