LoginSignup
14
14

More than 5 years have passed since last update.

fluentdのexec Output PluginでPythonプログラムを使う

Last updated at Posted at 2014-08-03

実戦で使い始めて1週間ですがfluentdとても便利。そしてexec Output PluginでPythonプログラムにデータを渡してみる。

起動コマンド

ドキュメントにあるPythonの例は次の通り。

td-agent.conf
<match fizzbuzz>
  type exec
  command python /path/to/fizzbuzz.py
  buffer_path    /path/to/buffer_path
  format tsv
  keys fizzbuzz
  flush_interval 5s # for debugging/checking
</match>

が、実際に使う時はvirtualenvで入れたPythonを指定したいし、PYTHONPATHどうなるのこれ、という疑問もあったのでcommandの指定はこうしてみた。

td-agent.conf
  # pythonはvirtualenvで入れた物を指定
  # 自分のコードであるmyappはsetup.pyを書いて予めパッケージインストールしておく
  command /path/to/myapp/bin/python −m myapp.command.xxxx arg1

これでPYTHONPATHは気にしなくても良いし、ファイルパスを長々と書かなくても済む。setup.pyは面倒くさがらずに書こう。

環境変数をセットしようとして

td-agent.conf
  # 環境変数を指定 (エラーになる)
  command HOGE=FUGA /path/to/myapp/bin/python -m myapp.command.xxx arg1

とすると次のエラーになってしまった。何か渡したい時は引数を使えと。32512なんて初めて見た。

td-agent.log
2014-08-03 17:59:54 +0900 [warn]: temporarily failed to flush the buffer. next_retry=2014-08-03 17:59:57 +0900 error_class="RuntimeError" error="command returns 32512: HOGE=FUGA /path/to/myapp/bin/python /path/to/myapp/myapp/command/xxx.py /path/to/buffer_path/.20140803.q4ffb5d85bf81f0d4.log"

実行ファイル

引数の最期にバッファファイルのパスが渡される。1行ずつ読んで処理するコードはこんな感じになる。テストコードから各処理を呼びたい場合はメソッドを分けておく。ファイルの読み込みから最後の処理までストリーム処理にしてあげると綺麗に書けるのと、PyMongoであればジェネレータをそのままCollectionのinsertメソッドに渡せるのでパフォーマンスも良い。

/path/to/myapp/myapp/command/xxx.py
# coding=utf-8

import json
import logging
import logging.config
import traceback

logging.config.fileConfig('logging.conf')
logger = logging.getLogger('fluent-exec-out')

def main():
    file_path = parse_args()
    do_something(exclude_error_row(parse(readline(file_path))))

def parse_args():
    return sys.argv[-1]

def readline(file_path):
    with file(file_path) as input:
        for line in input:
            yield line

def parse(lines):
    # 入力フォーマットをjsonとした場合
    for line in lines:
        yield json.loads(line)

def exclude_error_row(rows):
    for row in rows:
        # バリデーションとログ出力
        if xxxxxx:
            logger.warn("Invalid line found %s", row)
            continue
       yield row

def do_something(rows):
    # なにかやる


if __name__ == '__main__':
    logger.info('Start')
    try:
        main()
    except:
        logger.error(traceback.format_exc())
    logger.info('End')

テストコード

mainメソッドのテストはfluentdから渡されるバッファファイルと同じ形式のファイルを作って、引数に渡せば良い。

test_xxx.py
# coding=utf-8

from nose.tools import ok_, eq_

from myapp.command import xxx

original_argv = sys.argv

class TestAll(object):
    def teardown(self):
        sys.argv = original_argv

    def test_main(self):
        sys.argv.append('./tests/data/fluentd_buffer.log')
        xxx.main()
        # 何かテストする

    def test_readline(self):
        gen = xxx.readline('./tests/data/fluentd_buffer.log'):
        # 何かテストする

ログ出力

fluent-logger-pythonなんかを使って、さらにログをfluentdに渡すとかっこいいかもしれない。

フィルタ処理

上記コードではフィルタ処理もやっているが、exec_filter Output Pluginを使って一つ前のmatchでやるのがfluentd的には綺麗かもしれない。どっちでも出来るので好みの問題かな。

フィルタ処理程度なら一つのexec Outputでまとめてやっても良いと思うが、いくつもの処理を詰め込みすぎると途中で処理がコケたらどうするの、という心配毎が増える。exec Outputを分けて別々のコード呼び出しにした方が良いだろう。

14
14
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
14
14