0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Kinesisを覗く

Posted at

はじめに

kinesisに入っていくデータを覗き見たかった
kinesisにデータが入ると表示してくれる
kinesisへのフル権限があること

参考

kinesisを作ったりデータをいれたりするのは他を参考にしてください
Kinesisの雰囲気をつかむためにawscliで操作する
[JAWS-UG CLI] Kinesis:#2 データレコードの入力 / 取得

コード

kinesis_get.py

import sys
import time
import boto3
from  datetime import datetime




client = boto3.client('kinesis')


def get_kinesis(key):
    try:
        res = client.get_records(
            ShardIterator=key,
            Limit=123
        )
        return res

    except Exception as e:
       print('Kinesis put record excption')
       print(e)
       sys.exit



def get_iterator(name, date, id):
    t = datetime.strptime(date, '%Y/%m/%d')
    res = client.get_shard_iterator(
        StreamName=name,
        ShardId=id,
        ShardIteratorType='AT_TIMESTAMP',
        Timestamp=t
    )
    return res


def main(stream, date, shard_id):

    try:

        res = get_iterator(stream, date ,shard_id)

        itr = res['ShardIterator']

        while True:

            response = get_kinesis(itr)


            for record in response['Records']:
                print(record['Data'])


            nxt = response['NextShardIterator']

            time.sleep(2)
            if nxt == None or nxt == '':

                break

            itr = nxt


    except Exception as e:
        print('Exception exit')
        print(e)




if __name__ == '__main__':

    if len(sys.argv) < 3:
        error = "ERROR: Please enter  stream name and shard id\n"
        command = "{} python {} [stream name] [start date %Y/%m/%d] [shard id]".format(error, sys.argv[0])
        print(command)
        exit()

    print('----------start-------------')



    stream = sys.argv[1]
    date = sys.argv[2]

    if len(sys.argv) < 4:
        shard_id = 'shardId-000000000000'
    else:
        shard_id = sys.argv[3]

    main(stream, date, shard_id)



使いかた

必要なもの

pip install boto3

コマンド

日付の形式は[%Y/%m/%d]

必須なのはstream nameとstart date

python kinesis_get.py [stream name] [start date] [shard id]

###結果
これでkinesisにデータが挿入されると表示されていく

b'{"a": 1, "b": 2}'
b'{"a": 3, "b": 4}'
b'{"a": 5, "b": 6}'
b'{"a": 7, "b": 8}'

GitHub

git hub

0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?