Fluentd v0.12 がリリースされました。git log を見るに v0.10.1 がリリースされたのは 2011/10/16 とのことなので、3年越しのメジャーバージョンアップとなりました。自分が Fluentd の開発に携わってから初ですね。感慨深い。
Fluentd v0.12 には Fluentd v0.12 is Released で言及されているように、今まであった Input プラグイン、Output プラグインに加えて、Filter プラグイン という仕組みが追加されています。本記事ではその使い方、および作り方を解説します。
2014/12/30更新
- v0.12.2 で
filter
メソッドでnil
を返すとメッセージを削れるようになったので記事を修正。cf. #515 - 合わせて v0.12.2 で、Filter プラグインのテストヘルパーが追加されたのでテストコードサンプルを修正。cf. #519
概説
今まで Fluentd でメッセージを加工、フィルタリングしたい場合は、Output プラグインに Input プラグインのような実装を混入するという少しトリッキーなことをして実現してきました。例えば、fluent-plugin-grep、fluent-plugin-record-reformer などのプラグインがそうです。このようなプラグインが増えてきて、段々と需要があることがわかってきました。
そこで、このようなフィルタリング処理に対して、新たにFilterプラグインという専用の仕組みが導入されました。これによって、フィルタリング処理を効率よく、便利に扱うことができるようになります。
Filter プラグインの使い方
Filter プラグインを使うには <filter></filter>
ディレクティブを使用します。
Fluentd v0.12 には fluent-plugin-grep を Filter プラグインとして移植した grep Filter プラグイン、fluent-plugin-record-reformer を Filter プラグインとして移植した record_transformer Filter プラグインが同梱されています。これらを利用した conf を例として書いてみましょう。
<source>
type dummy
tag raw.dummy
dummy {"message":"[WARN] warning message"}
</source>
<filter raw.**>
type grep
regexp1 message WARN
</filter>
<filter raw.**>
type record_transformer
<record>
tag ${tag}
</record>
</filter>
<match raw.**>
type stdout
</match>
上から順番に filter が適用されていき、最後に match の output プラグインが実行されるという動きになります。UNIX のパイプみたいですね!
比較のために、旧来の Output プラグイン を利用した conf を書いてみます。
<source>
type dummy
tag raw.dummy
dummy {"message":"[WARN] warning message"}
</source>
<match raw.**>
type grep
regexp1 message WARN
add_tag_prefix greped
remove_tag_prefix raw
</match>
<match greped.**>
type record_reformer
tag reformed
<record>
tag ${tag}
</record>
</match>
<filter reformed>
type stdout
</match>
add_tag_prefix
やら remove_tag_prefix
やらのオプションを使ってタグを書き換えて、match のパターンも書き換えていますね。Filter プラグインの仕組みを使うとこのような煩わしさから解放されます。やった!╭( ・ㅂ・)و ̑̑
また、タグの書き換え処理をスキップできるため、パフォーマンスもあがっています。
444MBのファイルをrecord_reformerを挟んで転送する簡単なベンチマーク,v0.10だと181秒掛かったけど,v0.12だと149秒で終わった.やっぱりタグの書き換えとemitが減るから速くなってるな…よかった #fluentd
— Mr. Fiber (@repeatedly) 2014, 8月 24
Filter プラグインの作り方
Filter プラグインはまだ前述した grep フィルター、record_transformer フィルターの2つしか存在していないため、本記事では Filter プラグインの作り方を中心に解説していきます。
Filter プラグインの API は以下のようになっています。Fluent::Filter クラスを継承して、次のメソッドを実装します。
module Fluent
class SomeFilter < Filter
# プラグインを登録します
Plugin.register_filter('some', self)
# 従来の Output プラグインと同様にパラメータの設定をします。
# start メソッドが呼ばれる前に呼ばれます
def configure(conf)
super
...
end
# 起動する際に呼ばれます
def start
super
...
end
# 終了する際に呼ばれます
def shutdown
super
...
end
# レコードを弄りたい場合はこのメソッドを実装します
# 弄ったレコードを返します
# メッセージを削除したい場合は nil を返します (v0.12.2 から)
# メッセージを追加したい場合は filter_stream をオーバーライドして実装します
def filter(tag, time, record)
end
# EventStream を直接処理したい場合はこのメソッドをオーバーライドして実装します
# EventStream を返します
def filter_stream(tag, es)
new_es = MultiEventStream.new
es.each { |time, record|
begin
filtered_record = filter(tag, time, record)
new_es.add(time, filtered_record) if filtered_record
rescue => e
router.emit_error_event(tag, time, record, e)
end
}
new_es
end
end
end
filter(tag, time, record)
引数で渡ってきた record をいじって、それを返します。メッセージを削除したい場合は nil をかえします (v0.12.2 から)
シンプルですが、メッセージを追加する処理はできません。
filter
を実装する場合、filter_stream
を変更する必要はありません。
filter_stream(tag, es)
EventStream を直接処理したい場合はこのメソッドをオーバーライドして実装します。メッセージを削ったり、追加したり自由度の高い処理をしたい場合はこちらを実装します。
filter_stream
を実装する場合、filter
を変更する必要はありません。
Filter プラグインの実装サンプル
試しにメッセージを2つに増やすだけの何の役に立つのか分からない Filter プラグインを作成してみます。名前は dup プラグインとします。
以下のコードを lib/fluent/plugin/filter_dup.rb
というファイル名で保存します。
# lib/fluent/plugin/filter_dup.rb
module Fluent
class DupFilter < Filter
Plugin.register_filter('dup', self)
def configure(conf)
super
end
def filter_stream(tag, es)
new_es = MultiEventStream.new
es.each { |time, record|
begin
new_es.add(time, record)
new_es.add(time, record) # ここを増やしただけ!
rescue => e
router.emit_error_event(tag, time, record, e)
end
}
new_es
end
end
end
動作確認として次のような conf を書いて、
<source>
type forward
</source>
<filter **>
type dup
</filter>
<match **>
type stdout
</match>
fluentd を起動します。plugin をおいたディレクトリを -p オプションで指定します。
$ fluentd -c fluent.conf -p lib/fluent/plugin
fluent-cat でテストデータを送ってみましょう
$ echo '{"a":"foo","b":"bar"}' | fluent-cat raw.test
メッセージが2つに増えて出力されたら成功です。
Filter プラグインのテストの書き方
dup フィルタープラグインのテストは以下のように書きました。
require 'test/unit'
require 'fluent/log'
require 'fluent/test'
require 'fluent/plugin/test_filter_dup'
class DupFilterTest < Test::Unit::TestCase
include Fluent
setup do
Fluent::Test.setup
@time = Fluent::Engine.now
end
def create_driver(conf = '')
Test::FilterTestDriver.new(DupFilter).configure(conf, true)
end
def filter(config, msgs)
d = create_driver(config)
d.run {
msgs.each {|msg|
d.filter(msg, @time) # Filterプラグインにメッセージを通す
}
}
filtered = d.filtered_as_array # 結果を受け取る. [tag, time, record]の配列
filtered.map {|m| m[2] } # record だけ返す
end
sub_test_case 'configure' do
test 'check default' do
assert_nothing_raised { create_driver }
end
end
sub_test_case 'filter_stream' do
test 'dup' do
msg = {"message" => "foo"}
filtered = filter('', [msg])
assert_equal([msg, msg], filtered)
end
end
end
FilterTestDriver#run
で囲ったブロック内で #filter
にメッセージを通すと、filter_stream
で処理された結果が #filtered
(EventStream オブジェクト)、または filtered_as_array
(Array オブジェクト)で取得できます。
Filter プラグインの gem 化
Filter プラグインももちろん gem にできます。例えばこんなかんじです
=> fluent-plugin-filter_dup
Filter プラグインは Fluentd v0.12 以上でしか動かないので、.gemspec に
s.add_runtime_dependency "fluentd", ">= 0.12"
と書くようにすると良いでしょう。
もしくは、自作フィルター系 Output プラグインをすでにお持ちの方は同じ gem に filter_xxx.rb
ファイルを含めてしまうというのも1手かと思います。
その場合、Fluentd v0.10 の環境に gem がインストールされた場合でも壊してしまわないように、Filter プラグインを以下のように実装すると良いでしょう。
module Fluent
class DupFilter < Filter
...
- end
+ end if defined?(Filter) # Support only >= v0.12
end
おわりに
Fluentd v0.12 で導入された Filter プラグインの使い方、作り方について解説しました。
これからは、がしがし Filter プラグインの仕組みを使っていって欲しいですね\\\ ٩( ˘ω˘ )و ////
参考文献