LoginSignup
9
7

More than 5 years have passed since last update.

fluentdのexec_filter Output PluginでPythonプログラムを使う

Last updated at Posted at 2014-11-21

exec_filter Output Plugin (out_ecec_filter)は要するにフィルタ処理を任意のプログラムを使ってかませられる。例えば次の処理に不要なレコードを落したり、エラーとしてどこかに保存したり。ここでPythonプログラムを使ってみる。

設定

次の通りにしてみた。

<match hoge>
  type       exec_filter
  command    /path/to/python -m myapp.fluent_stream_filter

  time_key   time
  in_format  json
  out_format msgpack

  buffer_type file
  buffer_path /path/to/buffer
  buffer_chunk_limit 8m
  buffer_queue_limit 64
  flush_interval 10s

  tag        fuga
</match>

commandでモジュールとして実行しているのは、importが上手く動作するようにするため。これならPYTHONPATH指定は不要。

ファイルバッファを使っているのは、コードを修正した時にfluendの再起動が必要なため。exec_filter Outputプラグインはexec Outputプラグインと違い、指定したスクリプトは起動後に標準入力の入力待ちとなる。

スクリプト

exec_filter Output Pluginの要求として、スクリプトは標準入力を受けとり、標準出力に結果を吐かなければいけない。

fluent_stream_filter.py
# coding=utf-8
"""
fluentdのout_exec_filterで呼ばれるコード

標準入力から1行ずつ処理を行ない、結果を標準出力に吐く
"""
import json
import logging
import logging.config
import sys
import traceback

import msgpack

logging.config.fileConfig('/path/to/logging.conf')
logger = logging.getLogger('fluent-exec')
logger.propagate = False

def main():
    stdin = sys.stdin
    output(convert(validate(parse(readline(stdin)))))

def readline(stdin):
    for line in stdin:
        yield line

def parse(lines):
    for line in lines:
        yield json.loads(line)

def validate(rows):
    """ エラーデータの除去 """
    for row in rows:
        try:
             # なんらかの検査
        except Exception, e:
            logger.warn(e)
            logger.warn(row)
            continue
        yield row

def convert(rows):
    """ なんらかの変換処理 """
    for row in rows:
        # do something
        yield row

def output(rows):
    """ 標準出力に吐き出す """
    for row in rows:
        sys.stdout.write(msgpack.packb(row))

if __name__ == '__main__':
    logger.info('Start main')
    try:
        main()
    except Exception, e:
        logger.error(traceback.format_exc())
        raise e
    logger.info('End main')

logging.getLoggerには__name__を渡すのが常套だが、ここではスクリプト起動されるため内容は '__main__' である、なので文字列でロガーの名前を指定している。

テストコード

標準入力を扱うメソッドはパラメータでstdioを受けとるようにすると良いかも。テストで標準入力をそのまま使う方法が見つからなかったのでStringIOのインスタンスを渡して、テストデータが読めるようにした。

# coding=utf-8
import types
from StringIO import StringIO

from nose.tools import ok_, eq_

from myapp import fluent_stream_filter


class TestAll(object):
    def test_readline(self):
        """ readlineのテスト、sys.stdinの代りにStringIOを渡す """
        stdin = StringIO()
        stdin.write(open('/path/to/test_data.log').read())
        stdin.seek(0)

        stream = fluent_stream_filter.readline(stdin)
        ok_(isinstance(stream, types.GeneratorType))
        eq_(stream.next(), u'{1行目の内容}')

    def test_parse(self):
        """ parse のテスト """
        stream = fluent_stream_filter.parse(iter(['{...}', '{...}']))
        ok_(isinstance(stream, types.GeneratorType))
        eq_(stream.next(), {...})
        eq_(stream.next(), {...})

    # (略)

ログ出力での注意

標準出力に処理結果以外の、例えばログを吐き出してしまうと、当然MessagePackやJSONとしてパースできないのでfluentd側でエラーになる。rootロガーにConsoleHandlerがセットされていたりして意図せず標準出力にログを吐いてしまう事故を防ぐために

logger.propagate = False

しておくと安全

9
7
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
9
7