fluentdを利用していると、1つのイベントを複製して別々に処理したいという場面は多いと思います。
例えば、1つのイベントを複数のデータストアに保存したいというようなケースです。
単純に複数のデータストアに保存したいだけなら、copy Output Plugin等を利用すれば実現できます。
しかし、データストアごとに別々の前処理を行った上で保存したいというような要件には対応できません。
今回は、Pluginを自作する事で柔軟なイベント複製を行う方法を紹介します。
実装方針
基本的には、Filter Pluginの中で emit_stream
というメソッドを呼び出す事でイベントの複製を行います。
例1: 単純に複製するPlugin
まずはミニマムなサンプルとして、単にイベントを複製して任意のタグに流すPluginを作成してみます。
fluent.confの記述は以下のようなイメージです。
この例では、一つのsourceからfile Output
でのファイル出力と、Elasticsearch Output Plugin
でのElasticsearchへの書き込みを行っています。
file
というtagでイベントが流れてくるので、これをelastic
というtagに向けて複製する、という流れです。
<source>
@type forward
port 24224
</source>
# このプラグインを作る
<filter file>
@type copy_event
target_tag elastic
</filter>
<match file>
@type file
~~~
略
</match>
<match elastic>
@type elasticsearch
~~~
略
</match>
プラグインの実装は以下のような感じです。
module Fluent
class CopyEventFilter < Filter
Fluent::Plugin.register_filter('copy_event', self)
config_param :target_tag, :string, default: nil
def filter_stream(tag, es)
new_es = MultiEventStream.new
es.each do |time, record|
begin
new_es.add(time, record)
rescue => e
router.emit_error_event(tag, time, record, e)
end
end
# send_tagに向けてイベントを複製する
router.emit_stream(send_tag, new_es)
# イベントをそのまま返す
es
end
end
end
ポイントはrouter.emit_stream
です。fluent.conf
のtarget_tag
で指定したタグに向けて、流れてきたイベントを複製しています。
念のために自作のpluginを指定した起動方法を載せておきます。
# 先ほどのプラグインをplugins/filter_copy_event.rb で保存
fluentd -c fluent.conf -p plugins
例2: Elasticsearchにはstatusが200のものだけ流す
先ほどの例は単純にイベントを複製しただけだったので、もう少し実用的な例を示してみます。
一つのsourceからファイル出力とElasticsearchへの書き込みを行うという部分は先ほどと同じですが、Elasticsearchへはstatus
というrecordが200
のものだけ書き込むという処理を行うケースを考えます。
fluent.conf
は例1のものと同じで、プラグインを作って条件に合うものだけ複製します。
プラグインの実装は以下のような感じです。
module Fluent
class CopyEventFilter < Filter
Fluent::Plugin.register_filter('copy_event', self)
config_param :target_tag, :string, default: nil
def filter_stream(tag, es)
new_es = MultiEventStream.new
es.each do |time, record|
begin
status = record['status'].to_i
if status == 200
new_es.add(time, record)
end
rescue => e
router.emit_error_event(tag, time, record, e)
end
end
router.emit_stream(send_tag, new_es) unless new_es.empty?
es
end
end
end
これで、file
タグには全てのイベントが流れ、elastic
タグにはstatus
のrecordが200
のものだけが流れます。
終わりに
fluentdを使っていてイベントを複製しようと思ったら、日本語情報が全然なかったのでこの記事を書いてみました。が、イベントを複製する場合にこの方法がベターなのかは、正直分かりません・・・
ただ、今自分が扱っているシステムでは、この方法でDaily約5億件のログを問題なく捌けているので、大きな問題はないのかと思われます。