Fluentdで何かしたい場合にはプリミティブな処理を行うプラグインを複数組み合わせることが多い。例えばログをパースしたあとにフィルターをかけてからRedisなどの外部サービスに投げたり。その上で足りない部分を補完する形でプラグインを自作するというのが通常の流れ。
よくあるのは、最終的な外部へのポスト方法は一般的なもの(HTTPのPostなど)だけど、インタフェイスを合わせるためにフィルターを結構がんばる必要があるもの。できないことはないけどまるでFluentdの設定でプログラミングをしているかのように設定をつなげていく(実際できる)。それを諦めて専用のプラグインを作る(こっちの方が正しい)。Fluentdのプラグインは作りやすいので毎回作っても良いけど、ちょっとした処理でも毎回作るのは面倒。
そこで最近は汎用的でないちょっとした処理を行うものにはFluentdにデフォルトで組み込まれているout_execプラグインを使っている。
これが意外と運用上も割と使いやすくて便利だったのでどうやって使っているかをまとめておく。
out_exec
out_exec自体は単に指定したコマンドにデータを渡して実行するだけ。
<match test>
type exec
command cat
buffer_path /tmp/test
buffer_type file # デフォルトでfileになっている
</match>
上記のような設定をすると、引数にデータを格納したファイルへのパスが渡されて以下のようなコマンドが実行される。
$ cat /tmp/test.20141227.b50b303ef72b138cb.log
データファイルの中身のフォーマットはformat
オプションで指定可能で、format json
にすると以下のように1行に1 recordが格納される
{"foo":"100","bar":"200","baz":"300"}
{"foo":"101","bar":"201","baz":"301"}
{"foo":"102","bar":"202","baz":"302"}
コマンド側の実装
コマンド側は渡されたファイルに対して処理を行うような処理を好きなように書くだけ。Rubyにこだわらず、シェルスクリプトやPythonなどで書いても良い。
# !/usr/bin/env ruby
require 'json'
data = File.read(ARGV.first).split("\n").map{ |x| JSON.parse(x) }
do_something(data)
上記のようにエラー処理を全く何も考えていないプログラムでも問題無い。途中でエラーが出ればFluentdにより勝手に再送(再実行)される。
開発用のデータの準備
意外と困るのがコマンドを開発するときに渡すデータの準備。特にある程度フィルターをかけてから渡す場合には最終的にどのようなデータになっているのかわかりにくい場合もある。
そんな時もFluentdのout_fileを使えば、実際に渡されるデータと同じ形式のものを保存することができる。
<match test>
type copy
<store>
type file
path /tmp/sample
format json
</store>
<store>
type exec
command cat
buffer_path /tmp/test
format json
</store>
</match>
上記の例ではout_execに渡されるデータをout_fileにも渡している。ポイントはout_fileのオプションにもformat json
をつけていること。これで/tmp/sample以下にサンプルのデータが保存される。
運用中のポイント
エラーログ
コマンドが失敗した場合に自動的に再送は行われるけど、失敗したこと自体の情報は少しわかりにくい。Fluentdのログとしては以下のようにコマンドが失敗したことしか残らない。
2014-12-27 19:46:33 +0900 [warn]: fluent/output.rb:349:rescue in try_flush: failed to flush the buffer. error_class="RuntimeError" error="command returns 256: ./test.rb /tmp/test.20141227.q50b303ef6fe1661c.log" instance=70059072302200
実行したコマンド出力はFluentdの出力になるので、標準出力などを別途ログとして保存している場合には、そちらにコマンドのエラー内容が保存されているはず。Fluentdの内部イベント(fluent.**
のタグ)しか見てない場合には注意です。ちゃんとやるならログは別途落として流した方が良さそう。
デバッグのためにその場で実行
buffer_path
で指定した場所にあるファイルはもちろんそのまま実行することもできるので、コマンドにdryrunモードがあるなら指定して実行することも可能。もちろん冪等性があるならそのまま実行してしまっても良い。
実行ファイルの置き換え
実行するコマンドを修正したい場合はファイルを置き換えるだけでよく、Fluentdを再起動する必要がない。これは割とうれしいことが多い。逆に言えばコマンドの実行は毎回execされるので頻繁に実行する用途では使えない。
処理に失敗したデータの再実行
retry_limit
オプションを設定している場合は、設定した回数以上実行に失敗するとデータが捨てられる。secondary
オプションで捨てる前の処理を指定することができるので、ファイルとして残しておくこともできる。
<match test>
type exec
command ./test.rb
format json
buffer_path /tmp/test
retry_limit 3
<secondary>
type file
path /tmp/secondary/foo
</secondary>
</match>
上記の設定で3回以上失敗した場合は/tmp/secondary/fooにデータが保存される。
このデータの形式は/tmp/testにあったものを全く同じなので、改めて引数に指定して実行すれば再処理も可能。
もしくはdisable_retry_limit true
を追加して無限にリトライし続けることも可能。この場合の注意点はあまりにバッファが溜まり続けてbuffer_chunk_limitを超えた場合は新規に入ってくるデータが完全に捨てられてしまうこと。
オフラインで実行
Fluentdを落とした状態でたまったデータを処理したい場合もbuffer_path
にあるファイルを指定して実行することができる。ただし今は--without-source
オプションがあるので必要無いかもしれない。
データに不正なものが入っていてコマンド側ではスキップしづらい場合は、Fluentdを止めてからファイルに入っている該当箇所を削除して実行することもできる。手動で実行済みなど、ファイル自体いらなくなれば単純に削除すれば、次回Fluentd起動時にはそのファイルが処理されることもない。
最後に
この記事はFluentdのプラグインを書かないことを推奨するものではありません。
また、ここに書かれたテクニックはout_exec以外でも使えるものも含まれています。用量、用法を守って正しく使いましょう。