LoginSignup
5
2

More than 5 years have passed since last update.

標準入力でAmazon Kinesis StreamsにCSVデータを格納する方法

Last updated at Posted at 2017-03-05

この記事ではローカルホストやEC2から標準入力でKinesis Streamsにデータを入力するpythonスクリプトを記載します。

環境

・Amazon linux
・言語:python 2.7、shell
・入力データ形式:CSV

下記にKinesis Streamsに格納するpython scriptを記載する。
このscriptでは500レコード/秒※に集約して格納している。。(集約の意味はKPLを使って1レコードにaggrigationしてるというわけではないので注意してほしい)
※500以上に設定すると「failed to satisfy constraint: Member must have length less than or equal to 500」とエラーが発生する

script

buffer_insert.py
import sys
import json
import random
import boto3
import time

def create_json(buffered_data, streamname):
    jdat = {}
    dat = []
    jdat["StreamName"] = streamname      
    for rec in buffered_data :
        dat.append({"Data" : rec, "PartitionKey" : str(random.randint(1,1000))})
    jdat["Records"] = dat
    return jdat

if __name__ == '__main__':

  args = sys.argv
  streamname=args[1]

  cnt = 0
  buf = []

  client = boto3.client('kinesis')
  while 1:
      if len(buf) == 500:
          ret = client.put_records(**create_json(buf,streamname ))
          time.sleep(1)
          print ret
          buf = []
      line = sys.stdin.readline()
      if not line:
          break
      buf.append(line[:-1])

上記scriptではaccess-keyやsecret-keyのcredential情報を設定していないため、必要に応じて、client()内で設定してほしい。詳細は下記boto3ドキュメントを見て頂きたい。 http://boto3.readthedocs.io/en/latest/guide/configuration.html

実行は下記の通りです。
・Streamsのkinesis_streams_testが作成しておくこと
・test.csvというデータを入力

>実行
cat test.csv | python buffer_insert.py kinesis_streams_test

>結果
{u'ShardId': u'shardId-000000000000', u'SequenceNumber': u'49571046543942031937439475265378143459129484557577879554'}, {u'ShardId': u'shardId-000000000001', u'SequenceNumber': u'49571046543964332682638005888401204447079899259962654738'}, {u'ShardId': u'shardId-000000000000', u'SequenceNumber': u'49571046543942031937439475265379352384949099186752585730'}, {u'ShardId': u'shardId-000000000000', u'SequenceNumber': u'49571046543942031937439475265380561310768713815927291906'}, {u'ShardId': u'shardId-000000000000', u'SequenceNumber': u'49571046543942031937439475265381770236588328445101998082'}, {u'ShardId': u'shardId-000000000000', u'SequenceNumber': u'49571046543942031937439475265382979162407943074276704258'}, {u'ShardId': u'shardId-000000000001', u'SequenceNumber': u'49571046543964332682638005888402413372899513889137360914'}, {u'ShardId': u'shardId-000000000001', u'SequenceNumber': u'49571046543964332682638005888403622298719128518312067090'}], 'ResponseMetadata': {'HTTPStatusCode': 200, 'RequestId': 'e8d1fc47-17eb-1f03-b9e4-17c6595f9a22'}}

Kinesis Streamsに格納できたかどうかを確認するスクリプトを下記に記載。(横着して、pythonではなくAWS CLIからですが。。。pythonから取得することはもちろんできます。)

get_record.sh
#!/bin/bash

stream_name=$1
shard_array_index=${2:-0}
shard_iterator_type=${3:-TRIM_HORIZON}


shard_id=$(aws kinesis describe-stream --stream-name ${stream_name} --query "StreamDescription.Shards[${shard_array_index}].ShardId" --output text)
echo $shard_id
shard_iterator=$(aws kinesis get-shard-iterator --stream-name ${stream_name} --shard-id ${shard_id} --shard-iterator-type ${shard_iterator_type} --query "ShardIterator" --output text)
echo $shard_iterator
aws kinesis get-records --shard-iterator ${shard_iterator}

put_recordされたデータはBase64 エンコーディングされているため、consumer側でデコード処理を行う必要がある。AWS CLIではbase64のサポートを行っていないため、Base64 デコーダ(https://www.base64decode.org/ など)を使用する必要がある。

次回はKPLを使ってデータ集約し、Kinesis Streamsに格納するスクリプトを紹介します。

5
2
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
2