LoginSignup
11
10

More than 5 years have passed since last update.

pythonでKinesisになんか投げて、入ってることを確認する

Last updated at Posted at 2015-02-22

やること

  1. "hogehoge"をひたすらKinesisに投げる
  2. "hogehoge"が入っているか確認する

じゅんび

  1. Pythonを使えるようにする
  2. botoをインストールしておく
  3. IAMでKinesisをいじれるアカウントを作成しておく
  4. Kinesisにデータを受け取る口であるStreamとShardを作成する

ちなみに今回はshardの名前は「test」で数は1本としています。

かきこむ

Main.py
#-*- coding: utf-8 -*-

from boto import kinesis

auth = {"aws_access_key_id":"IAMのACCESS KEYを入れてください", "aws_secret_access_key":"IAMのSECRET ACCESS KEYを入れてください"}

if __name__ == '__main__':
    #            kinesis.connect_to_region('リージョン',IAM認証情報)
    Connection = kinesis.connect_to_region('us-east-1',**auth)
    while true:
        # 書き込む
        #             Connection.put_record(ストリーム名,書き込むデータ, PartitionKey)
        put_response = Connection.put_record('test'    , "hogehoge"  , 'one')
        sleep(10)

これだけ。

みる

shardが増えても処理できるようにshardごとにworkerを取り付けてデータを読むようにしてます。

show.py
# -*- coding: utf-8 -*-

import time
import base64
import multiprocessing
from boto import kinesis
import threading

auth = {"aws_access_key_id":"IAMのACCESS KEYを入れてください", "aws_secret_access_key":"IAMのSECRET ACCESS KEYを入れてください"}

# ストリームの名前
STREAM_NAME='test'

def worker(connect, kinesis_iterator):
    next_iterator = kinesis_iterator['ShardIterator']
    while True:
        response = connect.get_records(next_iterator)
        next_iterator = response['NextShardIterator']
        time.sleep(1)

        # shardに書かれてる内容を表示する
        for data in response['Records']:
            print(data)


def get_record():
    connect = kinesis.connect_to_region('us-east-1',**auth)
    stream = connect.describe_stream(STREAM_NAME)
    # shardのリストを取得する
    shards = stream['StreamDescription']['Shards']

    # shardごとにworkerを取り付けてデータを取得する
    for shard in shards:
        kinesis_iterator = connect.get_shard_iterator(STREAM_NAME, shard['ShardId'], shard_iterator_type='TRIM_HORIZON')
        job = threading.Thread(target=worker, args=(connect, kinesis_iterator))
        job.start()

if __name__ == '__main__':
    get_record()

こんな感じで行けました。

11
10
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
11
10