Help us understand the problem. What is going on with this article?

Fluentd v0.12 Filter プラグインの使い方と作り方

More than 5 years have passed since last update.

Fluentd v0.12 がリリースされました。git log を見るに v0.10.1 がリリースされたのは 2011/10/16 とのことなので、3年越しのメジャーバージョンアップとなりました。自分が Fluentd の開発に携わってから初ですね。感慨深い。

Fluentd v0.12 には Fluentd v0.12 is Released で言及されているように、今まであった Input プラグイン、Output プラグインに加えて、Filter プラグイン という仕組みが追加されています。本記事ではその使い方、および作り方を解説します。

2014/12/30更新

  • v0.12.2 で filter メソッドで nil を返すとメッセージを削れるようになったので記事を修正。cf. #515
  • 合わせて v0.12.2 で、Filter プラグインのテストヘルパーが追加されたのでテストコードサンプルを修正。cf. #519

概説

今まで Fluentd でメッセージを加工、フィルタリングしたい場合は、Output プラグインに Input プラグインのような実装を混入するという少しトリッキーなことをして実現してきました。例えば、fluent-plugin-grepfluent-plugin-record-reformer などのプラグインがそうです。このようなプラグインが増えてきて、段々と需要があることがわかってきました。

そこで、このようなフィルタリング処理に対して、新たにFilterプラグインという専用の仕組みが導入されました。これによって、フィルタリング処理を効率よく、便利に扱うことができるようになります。

Filter プラグインの使い方

Filter プラグインを使うには <filter></filter> ディレクティブを使用します。

Fluentd v0.12 には fluent-plugin-grep を Filter プラグインとして移植した grep Filter プラグイン、fluent-plugin-record-reformer を Filter プラグインとして移植した record_transformer Filter プラグインが同梱されています。これらを利用した conf を例として書いてみましょう。

<source>
  type dummy
  tag raw.dummy
  dummy {"message":"[WARN] warning message"}
</source>

<filter raw.**>
  type grep
  regexp1 message WARN
</filter>

<filter raw.**>
  type record_transformer
  <record>
    tag ${tag}
  </record>
</filter>

<match raw.**>
  type stdout
</match>

上から順番に filter が適用されていき、最後に match の output プラグインが実行されるという動きになります。UNIX のパイプみたいですね!

比較のために、旧来の Output プラグイン を利用した conf を書いてみます。

<source>
  type dummy
  tag raw.dummy
  dummy {"message":"[WARN] warning message"}
</source>

<match raw.**>
  type grep
  regexp1 message WARN
  add_tag_prefix greped
  remove_tag_prefix raw
</match>

<match greped.**>
  type record_reformer
  tag reformed
  <record>
    tag ${tag}
  </record>
</match>

<filter reformed>
  type stdout
</match>

add_tag_prefix やら remove_tag_prefix やらのオプションを使ってタグを書き換えて、match のパターンも書き換えていますね。Filter プラグインの仕組みを使うとこのような煩わしさから解放されます。やった!╭( ・ㅂ・)و ̑̑

また、タグの書き換え処理をスキップできるため、パフォーマンスもあがっています。

Filter プラグインの作り方

Filter プラグインはまだ前述した grep フィルター、record_transformer フィルターの2つしか存在していないため、本記事では Filter プラグインの作り方を中心に解説していきます。

Filter プラグインの API は以下のようになっています。Fluent::Filter クラスを継承して、次のメソッドを実装します。

module Fluent
  class SomeFilter < Filter
    # プラグインを登録します
    Plugin.register_filter('some', self)

    # 従来の Output プラグインと同様にパラメータの設定をします。
    # start メソッドが呼ばれる前に呼ばれます
    def configure(conf)
      super
      ...
    end

    # 起動する際に呼ばれます
    def start
      super
      ...
    end

    # 終了する際に呼ばれます
    def shutdown
      super
      ...
    end

    # レコードを弄りたい場合はこのメソッドを実装します
    # 弄ったレコードを返します
    # メッセージを削除したい場合は nil を返します (v0.12.2 から)
    # メッセージを追加したい場合は filter_stream をオーバーライドして実装します
    def filter(tag, time, record)
    end

    # EventStream を直接処理したい場合はこのメソッドをオーバーライドして実装します
    # EventStream を返します
    def filter_stream(tag, es)
      new_es = MultiEventStream.new
      es.each { |time, record|
        begin
          filtered_record = filter(tag, time, record)
          new_es.add(time, filtered_record) if filtered_record
        rescue => e
          router.emit_error_event(tag, time, record, e)
        end
      }
      new_es
    end
  end
end

filter(tag, time, record)

引数で渡ってきた record をいじって、それを返します。メッセージを削除したい場合は nil をかえします (v0.12.2 から)

シンプルですが、メッセージを追加する処理はできません。

filter を実装する場合、filter_stream を変更する必要はありません。

filter_stream(tag, es)

EventStream を直接処理したい場合はこのメソッドをオーバーライドして実装します。メッセージを削ったり、追加したり自由度の高い処理をしたい場合はこちらを実装します。

filter_stream を実装する場合、filter を変更する必要はありません。

Filter プラグインの実装サンプル

試しにメッセージを2つに増やすだけの何の役に立つのか分からない Filter プラグインを作成してみます。名前は dup プラグインとします。
以下のコードを lib/fluent/plugin/filter_dup.rb というファイル名で保存します。

# lib/fluent/plugin/filter_dup.rb
module Fluent
  class DupFilter < Filter
    Plugin.register_filter('dup', self)

    def configure(conf)
      super
    end

    def filter_stream(tag, es)
      new_es = MultiEventStream.new
      es.each { |time, record|
        begin
          new_es.add(time, record)
          new_es.add(time, record) # ここを増やしただけ!
        rescue => e
          router.emit_error_event(tag, time, record, e)
        end
      }
      new_es
    end
  end
end

動作確認として次のような conf を書いて、

<source>
  type forward
</source>

<filter **>
  type dup
</filter>

<match **>
  type stdout
</match>

fluentd を起動します。plugin をおいたディレクトリを -p オプションで指定します。

$ fluentd -c fluent.conf -p lib/fluent/plugin

fluent-cat でテストデータを送ってみましょう

$ echo '{"a":"foo","b":"bar"}' | fluent-cat raw.test

メッセージが2つに増えて出力されたら成功です。

Filter プラグインのテストの書き方

dup フィルタープラグインのテストは以下のように書きました。

require 'test/unit'
require 'fluent/log'
require 'fluent/test'
require 'fluent/plugin/test_filter_dup'

class DupFilterTest < Test::Unit::TestCase
  include Fluent

  setup do
    Fluent::Test.setup
    @time = Fluent::Engine.now
  end

  def create_driver(conf = '')
    Test::FilterTestDriver.new(DupFilter).configure(conf, true)
  end

  def filter(config, msgs)
    d = create_driver(config)
    d.run {
      msgs.each {|msg|
        d.filter(msg, @time) # Filterプラグインにメッセージを通す
      }
    }
    filtered = d.filtered_as_array # 結果を受け取る. [tag, time, record]の配列
    filtered.map {|m| m[2] } # record だけ返す
  end

  sub_test_case 'configure' do
    test 'check default' do
      assert_nothing_raised { create_driver }
    end
  end

  sub_test_case 'filter_stream' do
    test 'dup' do
      msg = {"message" => "foo"}
      filtered = filter('', [msg])
      assert_equal([msg, msg], filtered)
    end
  end
end

FilterTestDriver#run で囲ったブロック内で #filter にメッセージを通すと、filter_stream で処理された結果が #filtered (EventStream オブジェクト)、または filtered_as_array (Array オブジェクト)で取得できます。

Filter プラグインの gem 化

Filter プラグインももちろん gem にできます。例えばこんなかんじです
=> fluent-plugin-filter_dup

Filter プラグインは Fluentd v0.12 以上でしか動かないので、.gemspec に

s.add_runtime_dependency "fluentd", ">= 0.12"

と書くようにすると良いでしょう。


もしくは、自作フィルター系 Output プラグインをすでにお持ちの方は同じ gem に filter_xxx.rb ファイルを含めてしまうというのも1手かと思います。

その場合、Fluentd v0.10 の環境に gem がインストールされた場合でも壊してしまわないように、Filter プラグインを以下のように実装すると良いでしょう。

 module Fluent
   class DupFilter < Filter
     ...
-  end
+  end if defined?(Filter) # Support only >= v0.12
 end

おわりに

Fluentd v0.12 で導入された Filter プラグインの使い方、作り方について解説しました。

これからは、がしがし Filter プラグインの仕組みを使っていって欲しいですね\\\ ٩( ˘ω˘ )و ////

参考文献

  1. Fluentd v0.12でのFilterとLabel - Go ahead!
  2. Fluentd v0.12 is Released | Fluentd
sonots
A Ruby, Fluentd, and Chainer Committer. SRE Engineer. Qiitaは小ネタの投稿場所として利用しています。業務コードで、なぜそういう書き方をしているのか解説をQiitaに書いて、コードにはQiitaへのリンクを張る、という使い方をしていることが多いです(自己紹介じゃない)
https://medium.com/@sonots
zozotech
70億人のファッションを技術の力で変えていく
https://tech.zozo.com/
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
No comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
ユーザーは見つかりませんでした