この記事ではローカルホストやEC2から標準入力でKinesis Streamsにデータを入力するpythonスクリプトを記載します。
#環境
・Amazon linux
・言語:python 2.7、shell
・入力データ形式:CSV
下記にKinesis Streamsに格納するpython scriptを記載する。
このscriptでは500レコード/秒※に集約して格納している。。(集約の意味はKPLを使って1レコードにaggrigationしてるというわけではないので注意してほしい)
※500以上に設定すると「failed to satisfy constraint: Member must have length less than or equal to 500」とエラーが発生する
#script
import sys
import json
import random
import boto3
import time
def create_json(buffered_data, streamname):
jdat = {}
dat = []
jdat["StreamName"] = streamname
for rec in buffered_data :
dat.append({"Data" : rec, "PartitionKey" : str(random.randint(1,1000))})
jdat["Records"] = dat
return jdat
if __name__ == '__main__':
args = sys.argv
streamname=args[1]
cnt = 0
buf = []
client = boto3.client('kinesis')
while 1:
if len(buf) == 500:
ret = client.put_records(**create_json(buf,streamname ))
time.sleep(1)
print ret
buf = []
line = sys.stdin.readline()
if not line:
break
buf.append(line[:-1])
上記scriptではaccess-keyやsecret-keyのcredential情報を設定していないため、必要に応じて、client()内で設定してほしい。詳細は下記boto3ドキュメントを見て頂きたい。 http://boto3.readthedocs.io/en/latest/guide/configuration.html
実行は下記の通りです。
・Streamsのkinesis_streams_testが作成しておくこと
・test.csvというデータを入力
>実行
cat test.csv | python buffer_insert.py kinesis_streams_test
>結果
{u'ShardId': u'shardId-000000000000', u'SequenceNumber': u'49571046543942031937439475265378143459129484557577879554'}, {u'ShardId': u'shardId-000000000001', u'SequenceNumber': u'49571046543964332682638005888401204447079899259962654738'}, {u'ShardId': u'shardId-000000000000', u'SequenceNumber': u'49571046543942031937439475265379352384949099186752585730'}, {u'ShardId': u'shardId-000000000000', u'SequenceNumber': u'49571046543942031937439475265380561310768713815927291906'}, {u'ShardId': u'shardId-000000000000', u'SequenceNumber': u'49571046543942031937439475265381770236588328445101998082'}, {u'ShardId': u'shardId-000000000000', u'SequenceNumber': u'49571046543942031937439475265382979162407943074276704258'}, {u'ShardId': u'shardId-000000000001', u'SequenceNumber': u'49571046543964332682638005888402413372899513889137360914'}, {u'ShardId': u'shardId-000000000001', u'SequenceNumber': u'49571046543964332682638005888403622298719128518312067090'}], 'ResponseMetadata': {'HTTPStatusCode': 200, 'RequestId': 'e8d1fc47-17eb-1f03-b9e4-17c6595f9a22'}}
Kinesis Streamsに格納できたかどうかを確認するスクリプトを下記に記載。(横着して、pythonではなくAWS CLIからですが。。。pythonから取得することはもちろんできます。)
#!/bin/bash
stream_name=$1
shard_array_index=${2:-0}
shard_iterator_type=${3:-TRIM_HORIZON}
shard_id=$(aws kinesis describe-stream --stream-name ${stream_name} --query "StreamDescription.Shards[${shard_array_index}].ShardId" --output text)
echo $shard_id
shard_iterator=$(aws kinesis get-shard-iterator --stream-name ${stream_name} --shard-id ${shard_id} --shard-iterator-type ${shard_iterator_type} --query "ShardIterator" --output text)
echo $shard_iterator
aws kinesis get-records --shard-iterator ${shard_iterator}
put_recordされたデータはBase64 エンコーディングされているため、consumer側でデコード処理を行う必要がある。AWS CLIではbase64のサポートを行っていないため、Base64 デコーダ(https://www.base64decode.org/ など)を使用する必要がある。
次回はKPLを使ってデータ集約し、Kinesis Streamsに格納するスクリプトを紹介します。