0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Kinesis Data Streamsに指定したスループットでデータを送信する方法

Last updated at Posted at 2022-11-10

初めに

Kinesis Data Streamsなどストリーム系のサービスに連続的に指定したスループットでデータを送信して性能測定をしたくなることはありませんか?
ある業務で実際に性能測定をすることがあったので、Kinesis Agentと独自のPythonプログラムでKinesis Data Streamsにデータを送信する構成を作ってみました。
以下のようにサンプルデータを用意して、Pythonプログラムがそのデータを繰り返しフェッチしログデータとして連続的に書き込んで行きます。Kinesis AgentはそのログデータをフェッチしKinesis Data Streamsに送信します。
log-streaming.png

セットアップ

セットアップにあたって、Pythonプログラム、Kinesis Agentが動作するEC2インスタンス等を用意しておいてください。また、OSについてはAmazon Linux前提でセットアップ方法を記載します。

Kinesis Agentのインストール

以下のコマンドを実行してKinesis Agentをインストールします。

sudo yum install -y aws-kinesis-agent

Pythonプログラムの配置

以下のPythonプログラムをKinesis Agentがインストールされた環境にファイルとして配置します。(後述の説明の都合上ファイル名は log-generator.pyとします。)
注釈として、このプログラムは data = f'{input_data[icount]},{now}\n' のコードのところでカンマの後にログが出力される時刻をデータに追加しています(性能測定上便利なため)。

import argparse
import time
import datetime

parser = argparse.ArgumentParser(description='Log generator arguments')
parser.add_argument('--records', type=int)
parser.add_argument('--input', type=str)
parser.add_argument('--output', type=str)
args = parser.parse_args()

def generateData(records, input_data, output_file):
    ilen = len(input_data)
    icount = 0
    with open(output_file , 'w') as f:
        while True:
            startTime = time.time()
            now = datetime.datetime.now().isoformat()
            for i in range(records):
                data = f'{input_data[icount]},{now}\n'
                f.write(data)
                icount += 1
                if icount >= ilen:
                    icount = 0
            f.flush()
            endTime = time.time()
            waitTime = 1.0 - (endTime - startTime)
            if waitTime <= 0:
                print(f'Log generation does not meet the records / sec requirement: {waitTime}')
            else:
                time.sleep(waitTime)

def readFile(input_file):
    with open(input_file , 'r') as f:
        b = f.read()
        return b.splitlines()

def logGeneration():
    records = args.records
    input_file = args.input
    output_file = args.output

    input_data = readFile(input_file)
    generateData(records, input_data, output_file)

def main():
    logGeneration()

if __name__ == "__main__":
    main()

Kinesis Agentの設定

Kinesis Agentの設定ファイル(/etc/aws-kinesis/agent.json)を開き編集します。
以下が設定例です。
filePatternはPythonプログラムが出力するファイル名を指定するようにします。
kinesisStreamはKinesis Data Streamsで作成したストリーム名を指定するようにしてください。
maxBufferAgeMillisはKinesis AgentがKinesis Data Streamsにデータを送信するまでにバッファリングを行う時間です。この例では最小の1000(ms)を指定しています。

{ 
   "flows": [
        { 
            "filePattern": "/tmp/app.log*", 
            "kinesisStream": "yourkinesisstream",
            "maxBufferAgeMillis": 1000
        } 
   ] 
} 

実行方法

まずはKinesis Agentを停止及び起動します。

sudo service aws-kinesis-agent stop
sudo service aws-kinesis-agent start

Pythonプログラムが配置されているディレクトリに移動しPythonプログラムを実行します。
以下が実行例になります。
--recordsでは秒間でどれだけのレコード数を出力するかの指定をします。実行例では1秒間に1000レコードを出力します。
--inputではインプットとなるサンプルデータのパスを指定します。
--outputが出力ファイルとなるため、Kinesis Agentがフェッチする対象のファイルになります。

python3 log-generator.py --records 1000 --input /home/ssm-user/sample_data.csv --output /tmp/app.log
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?