初めに
Kinesis Data Streamsなどストリーム系のサービスに連続的に指定したスループットでデータを送信して性能測定をしたくなることはありませんか?
ある業務で実際に性能測定をすることがあったので、Kinesis Agentと独自のPythonプログラムでKinesis Data Streamsにデータを送信する構成を作ってみました。
以下のようにサンプルデータを用意して、Pythonプログラムがそのデータを繰り返しフェッチしログデータとして連続的に書き込んで行きます。Kinesis AgentはそのログデータをフェッチしKinesis Data Streamsに送信します。
セットアップ
セットアップにあたって、Pythonプログラム、Kinesis Agentが動作するEC2インスタンス等を用意しておいてください。また、OSについてはAmazon Linux前提でセットアップ方法を記載します。
Kinesis Agentのインストール
以下のコマンドを実行してKinesis Agentをインストールします。
sudo yum install -y aws-kinesis-agent
Pythonプログラムの配置
以下のPythonプログラムをKinesis Agentがインストールされた環境にファイルとして配置します。(後述の説明の都合上ファイル名は log-generator.py
とします。)
注釈として、このプログラムは data = f'{input_data[icount]},{now}\n'
のコードのところでカンマの後にログが出力される時刻をデータに追加しています(性能測定上便利なため)。
import argparse
import time
import datetime
parser = argparse.ArgumentParser(description='Log generator arguments')
parser.add_argument('--records', type=int)
parser.add_argument('--input', type=str)
parser.add_argument('--output', type=str)
args = parser.parse_args()
def generateData(records, input_data, output_file):
ilen = len(input_data)
icount = 0
with open(output_file , 'w') as f:
while True:
startTime = time.time()
now = datetime.datetime.now().isoformat()
for i in range(records):
data = f'{input_data[icount]},{now}\n'
f.write(data)
icount += 1
if icount >= ilen:
icount = 0
f.flush()
endTime = time.time()
waitTime = 1.0 - (endTime - startTime)
if waitTime <= 0:
print(f'Log generation does not meet the records / sec requirement: {waitTime}')
else:
time.sleep(waitTime)
def readFile(input_file):
with open(input_file , 'r') as f:
b = f.read()
return b.splitlines()
def logGeneration():
records = args.records
input_file = args.input
output_file = args.output
input_data = readFile(input_file)
generateData(records, input_data, output_file)
def main():
logGeneration()
if __name__ == "__main__":
main()
Kinesis Agentの設定
Kinesis Agentの設定ファイル(/etc/aws-kinesis/agent.json
)を開き編集します。
以下が設定例です。
filePattern
はPythonプログラムが出力するファイル名を指定するようにします。
kinesisStream
はKinesis Data Streamsで作成したストリーム名を指定するようにしてください。
maxBufferAgeMillis
はKinesis AgentがKinesis Data Streamsにデータを送信するまでにバッファリングを行う時間です。この例では最小の1000(ms)を指定しています。
{
"flows": [
{
"filePattern": "/tmp/app.log*",
"kinesisStream": "yourkinesisstream",
"maxBufferAgeMillis": 1000
}
]
}
実行方法
まずはKinesis Agentを停止及び起動します。
sudo service aws-kinesis-agent stop
sudo service aws-kinesis-agent start
Pythonプログラムが配置されているディレクトリに移動しPythonプログラムを実行します。
以下が実行例になります。
--records
では秒間でどれだけのレコード数を出力するかの指定をします。実行例では1秒間に1000レコードを出力します。
--input
ではインプットとなるサンプルデータのパスを指定します。
--output
が出力ファイルとなるため、Kinesis Agentがフェッチする対象のファイルになります。
python3 log-generator.py --records 1000 --input /home/ssm-user/sample_data.csv --output /tmp/app.log