exec_filter Output Plugin (out_ecec_filter)は要するにフィルタ処理を任意のプログラムを使ってかませられる。例えば次の処理に不要なレコードを落したり、エラーとしてどこかに保存したり。ここでPythonプログラムを使ってみる。
設定
次の通りにしてみた。
<match hoge>
type exec_filter
command /path/to/python -m myapp.fluent_stream_filter
time_key time
in_format json
out_format msgpack
buffer_type file
buffer_path /path/to/buffer
buffer_chunk_limit 8m
buffer_queue_limit 64
flush_interval 10s
tag fuga
</match>
command
でモジュールとして実行しているのは、importが上手く動作するようにするため。これならPYTHONPATH指定は不要。
ファイルバッファを使っているのは、コードを修正した時にfluendの再起動が必要なため。exec_filter Outputプラグインはexec Outputプラグインと違い、指定したスクリプトは起動後に標準入力の入力待ちとなる。
スクリプト
exec_filter Output Pluginの要求として、スクリプトは標準入力を受けとり、標準出力に結果を吐かなければいけない。
# coding=utf-8
"""
fluentdのout_exec_filterで呼ばれるコード
標準入力から1行ずつ処理を行ない、結果を標準出力に吐く
"""
import json
import logging
import logging.config
import sys
import traceback
import msgpack
logging.config.fileConfig('/path/to/logging.conf')
logger = logging.getLogger('fluent-exec')
logger.propagate = False
def main():
stdin = sys.stdin
output(convert(validate(parse(readline(stdin)))))
def readline(stdin):
for line in stdin:
yield line
def parse(lines):
for line in lines:
yield json.loads(line)
def validate(rows):
""" エラーデータの除去 """
for row in rows:
try:
# なんらかの検査
except Exception, e:
logger.warn(e)
logger.warn(row)
continue
yield row
def convert(rows):
""" なんらかの変換処理 """
for row in rows:
# do something
yield row
def output(rows):
""" 標準出力に吐き出す """
for row in rows:
sys.stdout.write(msgpack.packb(row))
if __name__ == '__main__':
logger.info('Start main')
try:
main()
except Exception, e:
logger.error(traceback.format_exc())
raise e
logger.info('End main')
logging.getLoggerには__name__
を渡すのが常套だが、ここではスクリプト起動されるため内容は '__main__'
である、なので文字列でロガーの名前を指定している。
テストコード
標準入力を扱うメソッドはパラメータでstdioを受けとるようにすると良いかも。テストで標準入力をそのまま使う方法が見つからなかったのでStringIOのインスタンスを渡して、テストデータが読めるようにした。
# coding=utf-8
import types
from StringIO import StringIO
from nose.tools import ok_, eq_
from myapp import fluent_stream_filter
class TestAll(object):
def test_readline(self):
""" readlineのテスト、sys.stdinの代りにStringIOを渡す """
stdin = StringIO()
stdin.write(open('/path/to/test_data.log').read())
stdin.seek(0)
stream = fluent_stream_filter.readline(stdin)
ok_(isinstance(stream, types.GeneratorType))
eq_(stream.next(), u'{1行目の内容}')
def test_parse(self):
""" parse のテスト """
stream = fluent_stream_filter.parse(iter(['{...}', '{...}']))
ok_(isinstance(stream, types.GeneratorType))
eq_(stream.next(), {...})
eq_(stream.next(), {...})
# (略)
ログ出力での注意
標準出力に処理結果以外の、例えばログを吐き出してしまうと、当然MessagePackやJSONとしてパースできないのでfluentd側でエラーになる。rootロガーにConsoleHandlerがセットされていたりして意図せず標準出力にログを吐いてしまう事故を防ぐために
logger.propagate = False
しておくと安全