Posted at

fluentdでイベントを複製するプラグインの作り方

More than 1 year has passed since last update.

fluentdを利用していると、1つのイベントを複製して別々に処理したいという場面は多いと思います。

例えば、1つのイベントを複数のデータストアに保存したいというようなケースです。

単純に複数のデータストアに保存したいだけなら、copy Output Plugin等を利用すれば実現できます。

しかし、データストアごとに別々の前処理を行った上で保存したいというような要件には対応できません。

今回は、Pluginを自作する事で柔軟なイベント複製を行う方法を紹介します。


実装方針

基本的には、Filter Pluginの中で emit_stream というメソッドを呼び出す事でイベントの複製を行います。


例1: 単純に複製するPlugin

まずはミニマムなサンプルとして、単にイベントを複製して任意のタグに流すPluginを作成してみます。

fluent.confの記述は以下のようなイメージです。

この例では、一つのsourceからfile Outputでのファイル出力と、Elasticsearch Output PluginでのElasticsearchへの書き込みを行っています。

fileというtagでイベントが流れてくるので、これをelasticというtagに向けて複製する、という流れです。


fluent.conf

<source>

@type forward
port 24224
</source>

# このプラグインを作る
<filter file>
@type copy_event
target_tag elastic
</filter>

<match file>
@type file
~~~

</match>

<match elastic>
@type elasticsearch
~~~

</match>


プラグインの実装は以下のような感じです。


plugins/filter_copy_event.rb

module Fluent

class CopyEventFilter < Filter

Fluent::Plugin.register_filter('copy_event', self)

config_param :target_tag, :string, default: nil

def filter_stream(tag, es)
new_es = MultiEventStream.new
es.each do |time, record|
begin
new_es.add(time, record)
rescue => e
router.emit_error_event(tag, time, record, e)
end
end
# send_tagに向けてイベントを複製する
router.emit_stream(send_tag, new_es)
# イベントをそのまま返す
es
end
end
end


ポイントはrouter.emit_streamです。fluent.conftarget_tagで指定したタグに向けて、流れてきたイベントを複製しています。

念のために自作のpluginを指定した起動方法を載せておきます。

# 先ほどのプラグインをplugins/filter_copy_event.rb で保存

fluentd -c fluent.conf -p plugins


例2: Elasticsearchにはstatusが200のものだけ流す

先ほどの例は単純にイベントを複製しただけだったので、もう少し実用的な例を示してみます。

一つのsourceからファイル出力とElasticsearchへの書き込みを行うという部分は先ほどと同じですが、Elasticsearchへはstatusというrecordが200のものだけ書き込むという処理を行うケースを考えます。

fluent.confは例1のものと同じで、プラグインを作って条件に合うものだけ複製します。

プラグインの実装は以下のような感じです。


plugins/filter_copy_event.rb

module Fluent

class CopyEventFilter < Filter

Fluent::Plugin.register_filter('copy_event', self)

config_param :target_tag, :string, default: nil

def filter_stream(tag, es)
new_es = MultiEventStream.new
es.each do |time, record|
begin
status = record['status'].to_i
if status == 200
new_es.add(time, record)
end
rescue => e
router.emit_error_event(tag, time, record, e)
end
end
router.emit_stream(send_tag, new_es) unless new_es.empty?
es
end
end
end


これで、fileタグには全てのイベントが流れ、elasticタグにはstatusのrecordが200のものだけが流れます。


終わりに

fluentdを使っていてイベントを複製しようと思ったら、日本語情報が全然なかったのでこの記事を書いてみました。が、イベントを複製する場合にこの方法がベターなのかは、正直分かりません・・・

ただ、今自分が扱っているシステムでは、この方法でDaily約5億件のログを問題なく捌けているので、大きな問題はないのかと思われます。