実戦で使い始めて1週間ですがfluentdとても便利。そしてexec Output PluginでPythonプログラムにデータを渡してみる。
起動コマンド
ドキュメントにあるPythonの例は次の通り。
<match fizzbuzz>
type exec
command python /path/to/fizzbuzz.py
buffer_path /path/to/buffer_path
format tsv
keys fizzbuzz
flush_interval 5s # for debugging/checking
</match>
が、実際に使う時はvirtualenvで入れたPythonを指定したいし、PYTHONPATHどうなるのこれ、という疑問もあったのでcommand
の指定はこうしてみた。
# pythonはvirtualenvで入れた物を指定
# 自分のコードであるmyappはsetup.pyを書いて予めパッケージインストールしておく
command /path/to/myapp/bin/python −m myapp.command.xxxx arg1
これでPYTHONPATHは気にしなくても良いし、ファイルパスを長々と書かなくても済む。setup.pyは面倒くさがらずに書こう。
環境変数をセットしようとして
# 環境変数を指定 (エラーになる)
command HOGE=FUGA /path/to/myapp/bin/python -m myapp.command.xxx arg1
とすると次のエラーになってしまった。何か渡したい時は引数を使えと。32512なんて初めて見た。
2014-08-03 17:59:54 +0900 [warn]: temporarily failed to flush the buffer. next_retry=2014-08-03 17:59:57 +0900 error_class="RuntimeError" error="command returns 32512: HOGE=FUGA /path/to/myapp/bin/python /path/to/myapp/myapp/command/xxx.py /path/to/buffer_path/.20140803.q4ffb5d85bf81f0d4.log"
実行ファイル
引数の最期にバッファファイルのパスが渡される。1行ずつ読んで処理するコードはこんな感じになる。テストコードから各処理を呼びたい場合はメソッドを分けておく。ファイルの読み込みから最後の処理までストリーム処理にしてあげると綺麗に書けるのと、PyMongoであればジェネレータをそのままCollectionのinsertメソッドに渡せるのでパフォーマンスも良い。
# coding=utf-8
import json
import logging
import logging.config
import traceback
logging.config.fileConfig('logging.conf')
logger = logging.getLogger('fluent-exec-out')
def main():
file_path = parse_args()
do_something(exclude_error_row(parse(readline(file_path))))
def parse_args():
return sys.argv[-1]
def readline(file_path):
with file(file_path) as input:
for line in input:
yield line
def parse(lines):
# 入力フォーマットをjsonとした場合
for line in lines:
yield json.loads(line)
def exclude_error_row(rows):
for row in rows:
# バリデーションとログ出力
if xxxxxx:
logger.warn("Invalid line found %s", row)
continue
yield row
def do_something(rows):
# なにかやる
if __name__ == '__main__':
logger.info('Start')
try:
main()
except:
logger.error(traceback.format_exc())
logger.info('End')
テストコード
mainメソッドのテストはfluentdから渡されるバッファファイルと同じ形式のファイルを作って、引数に渡せば良い。
# coding=utf-8
from nose.tools import ok_, eq_
from myapp.command import xxx
original_argv = sys.argv
class TestAll(object):
def teardown(self):
sys.argv = original_argv
def test_main(self):
sys.argv.append('./tests/data/fluentd_buffer.log')
xxx.main()
# 何かテストする
def test_readline(self):
gen = xxx.readline('./tests/data/fluentd_buffer.log'):
# 何かテストする
ログ出力
fluent-logger-pythonなんかを使って、さらにログをfluentdに渡すとかっこいいかもしれない。
フィルタ処理
上記コードではフィルタ処理もやっているが、exec_filter Output Pluginを使って一つ前のmatchでやるのがfluentd的には綺麗かもしれない。どっちでも出来るので好みの問題かな。
フィルタ処理程度なら一つのexec Outputでまとめてやっても良いと思うが、いくつもの処理を詰め込みすぎると途中で処理がコケたらどうするの、という心配毎が増える。exec Outputを分けて別々のコード呼び出しにした方が良いだろう。