LoginSignup
6
3

More than 5 years have passed since last update.

fluentdでイベントを複製するプラグインの作り方

Posted at

fluentdを利用していると、1つのイベントを複製して別々に処理したいという場面は多いと思います。
例えば、1つのイベントを複数のデータストアに保存したいというようなケースです。

単純に複数のデータストアに保存したいだけなら、copy Output Plugin等を利用すれば実現できます。
しかし、データストアごとに別々の前処理を行った上で保存したいというような要件には対応できません。

今回は、Pluginを自作する事で柔軟なイベント複製を行う方法を紹介します。

実装方針

基本的には、Filter Pluginの中で emit_stream というメソッドを呼び出す事でイベントの複製を行います。

例1: 単純に複製するPlugin

まずはミニマムなサンプルとして、単にイベントを複製して任意のタグに流すPluginを作成してみます。

fluent.confの記述は以下のようなイメージです。
この例では、一つのsourceからfile Outputでのファイル出力と、Elasticsearch Output PluginでのElasticsearchへの書き込みを行っています。
fileというtagでイベントが流れてくるので、これをelasticというtagに向けて複製する、という流れです。

fluent.conf
<source>
  @type forward
  port 24224
</source>

# このプラグインを作る
<filter file>
  @type copy_event
  target_tag elastic
</filter>

<match file>
  @type file
  ~~~
  略
</match>

<match elastic>
  @type elasticsearch
  ~~~
  略
</match>

プラグインの実装は以下のような感じです。

plugins/filter_copy_event.rb
module Fluent
  class CopyEventFilter < Filter

    Fluent::Plugin.register_filter('copy_event', self)

    config_param :target_tag, :string, default: nil

    def filter_stream(tag, es)
      new_es = MultiEventStream.new
      es.each do |time, record|
        begin
          new_es.add(time, record)
        rescue => e
          router.emit_error_event(tag, time, record, e)
        end
      end
      # send_tagに向けてイベントを複製する
      router.emit_stream(send_tag, new_es)
      # イベントをそのまま返す
      es
    end
  end
end

ポイントはrouter.emit_streamです。fluent.conftarget_tagで指定したタグに向けて、流れてきたイベントを複製しています。

念のために自作のpluginを指定した起動方法を載せておきます。

# 先ほどのプラグインをplugins/filter_copy_event.rb で保存
fluentd -c fluent.conf -p plugins

例2: Elasticsearchにはstatusが200のものだけ流す

先ほどの例は単純にイベントを複製しただけだったので、もう少し実用的な例を示してみます。

一つのsourceからファイル出力とElasticsearchへの書き込みを行うという部分は先ほどと同じですが、Elasticsearchへはstatusというrecordが200のものだけ書き込むという処理を行うケースを考えます。
fluent.confは例1のものと同じで、プラグインを作って条件に合うものだけ複製します。

プラグインの実装は以下のような感じです。

plugins/filter_copy_event.rb
module Fluent
  class CopyEventFilter < Filter

    Fluent::Plugin.register_filter('copy_event', self)

    config_param :target_tag, :string, default: nil

    def filter_stream(tag, es)
      new_es = MultiEventStream.new
      es.each do |time, record|
        begin
          status = record['status'].to_i
          if status == 200
            new_es.add(time, record)
          end
        rescue => e
          router.emit_error_event(tag, time, record, e)
        end
      end
      router.emit_stream(send_tag, new_es) unless new_es.empty?
      es
    end
  end
end

これで、fileタグには全てのイベントが流れ、elasticタグにはstatusのrecordが200のものだけが流れます。

終わりに

fluentdを使っていてイベントを複製しようと思ったら、日本語情報が全然なかったのでこの記事を書いてみました。が、イベントを複製する場合にこの方法がベターなのかは、正直分かりません・・・
ただ、今自分が扱っているシステムでは、この方法でDaily約5億件のログを問題なく捌けているので、大きな問題はないのかと思われます。

6
3
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
6
3