Fluentd

fluentdのfilterで例外が発生したとき、最適化ありなしで振る舞いが変わるかの調査記録


背景

諸事情によりfluentdのfilter pluginを書いていたところ、同僚から「optimized_filter_stream だと filter plugin でエラーが起きた場合は filter がない場合と同じ挙動をして(つまりフィルターを通り抜ける)、filter_stream だとエラーになったレコードは消失するような実装になっている気がする。」という話を聞いたので調べてみる


前提


  • fluentd v1.3.2時点での内容


filter chain optimizationとは

filterの処理がpipelineに複数含まれる場合、パフォーマンスの向上を目的として最適化されたコードが呼ばれるようになっている。

def filter_stream(tag, es)

if optimizable?
optimized_filter_stream(tag, es)
else
@filters.reduce(es) { |acc, filter| filter.filter_stream(tag, acc) }
end
end

https://github.com/fluent/fluentd/blob/v1.3.2/lib/fluent/event_router.rb#L173

この optimizable? は filter側で filter_stream を再実装しているものがなければtrueになる。

filter だけで実装しているpluginに限り、filterごとに Fluent::MultiEventStream#add されるのを避け、先にfilterの処理を済ませてから最後に Fluent::MultiEventStream#add することで処理を改善しているみたい。


例外時の振る舞い

最適化ありとなしの状態でそれぞれどのような振る舞いになるか見てみる。

送信するデータとconfは以下の通り。

$ echo '{"dummy":1}' | fluent-cat debug.test

<source>

@type forward
@id forward_input
</source>

<filter debug.test>
@type filter_sample
</filter>

<match debug.test>
@type stdout
</match>


最適化あり

filter pluginを作成して、 filter を下記のように実装したときの動作を見てみる。

      def filter(tag, time, record)

raise 'oops'
end

実行すると下記のようなログが出力された。warnとともに<match debug.test> 部分が出力されている。

2018-12-11 12:51:17 +0900 [warn]: #0 dump an error event: error_class=RuntimeError error="oops" location="/fluent-plugin-filter-sample/lib/fluent/plugin/filter_filter_sample.rb:24:in `filter'" tag="debug.test" time=2018-12-11 12:51:17.912388000 +0900 record={"dummy"=>1}

2018-12-11 12:51:17.912388000 +0900 debug.test: {"dummy":1}


最適化なし

filter_stream を時前実装している場合最適化処理は行われないので、以下のようなコードで動作を見てみる。

      def filter(tag, time, record)

raise 'oops'
end

def filter_stream(tag, es)
super
end

実行すると下記のようなログが出力された。warn最適化ありと違ってwarnしか出力されていない。

2018-12-11 12:54:28 +0900 [warn]: #0 dump an error event: error_class=RuntimeError error="oops" location="/fluent-plugin-filter-sample/lib/fluent/plugin/filter_filter_sample.rb:24:in `filter'" tag="debug.test" time=2018-12-11 12:54:28.848909000 +0900 record={"dummy"=>1}


なぜこうなるか

最適化ありでは、filterで例外が発生した場合でも下記の部分でrecordがevent_streamに追加される

https://github.com/fluent/fluentd/blob/v1.3.2/lib/fluent/event_router.rb#L208

最適化なしの場合、 new_es.addの部分がbegin ~ rescueでくくられているので、filterで例外が起きるとevent_streamに追加されない

https://github.com/fluent/fluentd/blob/v1.3.2/lib/fluent/plugin/filter.rb#L54


期待する動作

例外は起きた場合は fluentd本体のコードで例外がキャッチされ、 router.emit_error_event(tag, time, record, e)

によって <label @ERROR> で捕捉できることを期待する。なので明示的に例外処理を書かずに済ませたいが、そうすると最適化ありなしで例外時のレコードの振る舞いが変わってしまう。


とりあえず

例外が起きたときにfilter処理されずそのまますり抜けてしまうとまずい場合はplugin側で明示的にrescueしたほうが安全、一方でfluentd本体の振る舞いが最適化ありなしで変わってしまうのはおかしいきもするので、そちらの仕様を見直したほうがいいのかもしれない