29
30

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

LivesenseAdvent Calendar 2014

Day 6

Fluentd v0.12の目玉機能らしいFilterを試してみた

Posted at

Advent Calendar書くネタなんてなかったからどうしようかマジ焦った^^;

背景

apacheのエラーログなんかをfluentdに投げて、チャットツールに流したりしてる。
そんなことしていると、余計なデータがいっぱいチャットログに出るのでフィルタリングしたいなーって思ってた。

とりあえずfluentdのプラグインをいろいろ組み合わせればやれそうだなーと思っていた。

実際fluent-plugin-grepfluent-plugin-hipchatとか組み合わせてやってみてできたのだけど、
痒いところに手が届かなかった。

そこでoutputプラグインを自作してそれを使っていた。
ただ自作のプラグインは、目的は達成できたものの、もろもろの条件などをベタ書きした糞コード。

全然イカしてない・・・・・・

なんとかならんのか・・・・

そんなときに神のお告げをきいた:Fluentd v0.12でのFilterとLabel

そして幸いにもその神のお告げは、すでにmasterにマージされているらしかった!!

・・・・ということで早速使ってみた。

爆速で仕様が変わっていた

上のページの通り設定を書いて使おうとしてみたらエラーで起動しなかった><
どうやら設定ファイルの書き方がかわっていたようである。

いろいろ調べたところ、最新の設定ファイルはexampleフォルダ以下にあると判明

filterのサンプル設定ファイルを読んでみる

v0_12_filter.confっていうファイルがあったので多分これだろうと思って読んでみた。

例が3つあるようだ

<source>
  type forward
  port 24224
</source>

# 例1:正規表現にマッチするレコードのみ通す
<filter foo>
  type grep
  regexp1  message keep this
</filter>
<match foo>
  type stdout
</match>

# 例2:対象レコードにデータ(ホスト名)を追加
<filter bar>
  type record_transformer
  <record>
    hostname ${hostname}
  </record>
</filter>
<match bar>
  type forward
  <server>
    host 123.4.2.4
    port 24225
  </server>
</match>

# 例3:除外条件(helloデータを持つ)にマッチしないレコードを通して、データをdowncase化
<filter foo.bar>
  type grep
  exclude1 hello .
</filter>
<filter foo.bar>
  type record_transformer
  enable_ruby true
  <record>
    name ${name.downcase}
  </record>
</filter>
<match foo.bar>
  type stdout
</match>

ふむふむ、確かに書き方変わっとる。

でももうこれでわかったので後は試してみよう。

filterプラグインを試してみる

まずは設定ファイルは上のものを流用してみた。

exampleの設定ファイルを一部書き換え

# まずはリポジトリ落としてきて事前準備
$ git clone git@github.com:fluent/fluentd.git fluentd-filter-test
$ cd fluentd-filter-test
$ bundle install

# サンプル設定ファイルをコピってくる
$ cp example/v0_12_filter.conf .

# そのまま標準出力するように一部書き換え
$ vim v0_12_filter.conf
=========
# こんな感じに書き換えます
# -が削除する行、+が追加する行
 <match bar>
-  type forward
-  <server>
-    host 123.4.2.4
-    port 24225
-  </server>
+  type stdout
 </match>
 =========

書き換えた設定ファイルを読み込ませてfluentd起動

# -cで設定ファイルを指定できます
$ bin/fluentd -c v0_12_filter.conf

こいつはこのまま起動しっぱなしにしておく

例1を試してみる:単純なgrepフィルタリング(grepフィルタ)

別のターミナルからfluentdにデータを飛ばしてみる

# フィルタリングされない例
$ echo '{"message":"keep this please"}' | fluent-cat foo

2014-12-03 18:43:12 +0900 foo: {"message":"keep this please"}

# フィルタリングされる例(なにも標準出力されない)
$ echo '{"message":"Do not keep"}' | fluent-cat foo

例2を試してみる:レコードにデータを追加(record_transformerフィルタ)

# レコードはなんでもいいので実行
$ echo '{"message":"Do not keep"}' | fluent-cat bar

2014-12-03 18:39:09 +0900 bar: {"key1":"value1","hostname":"mymachine.local"}
# ↑こんな感じでホスト名を追加してくれる

例3を試してみる:除外指定とマッチしたデータをdowncase化

# 除外指定にひっかかる例(なにも標準出力されない)
$ echo '{"name":"SADA", "hello":100}' | fluent-cat foo.bar

# 除外指定にひっかからず、downcaseされる例
$ echo '{"name":"SADA"}' | fluent-cat foo.bar

2014-12-03 18:50:17 +0900 foo.bar: {"name":"sada"}

ちなみにgrepは正規表現を使えるようなので複雑なマッチングもできそうです

ふむふむなかなかおもしろい感じ。

Filterってどうなってるんだろう。

そもそも、Filterってどうなってるんだろと思ってしまった。

Filterプラグインをのぞいてみた。

filter_grep.rbfilter_record_transformer.rbを覗いてみた。

基本的には, configureとfilter_streamの2メソッドでできてるらしい

特にfilter_streamが本体で、タグとイベントストリームの2引数を取得してフィルターした結果を返すようになっています。

簡単ですね。

module Fluent
  class GrepFilter < Filter
    Fluent::Plugin.register_filter('grep', self)
    〜略〜

    def configure(conf)
      〜略〜
    end

    def filter_stream(tag, es)
      result_es = MultiEventStream.new
      es.each do |time, record|
          〜略〜
          result_es.add(time, record)
      end
      result_es
      〜略〜
    end

  end
end

調子にのってFilterプラグインを作ってみた

コードを覗いていると持病のあれがあれで、作りたくなってきてしまった。

でも自作する力量なり発想は元よりないので、自作というより他人のコードを流用させてもらった。

流用元:fluent-plugin-suppress

レコードをいい感じに間引いてくれるプラグイン。
ただ、これはoutputプラグインとして書かれているので
これをfilterプラグイン化してみた。

それが以下。

# -*- coding: utf-8 -*-
module Fluent
  class SuppressFilter < Filter
    include Fluent::HandleTagNameMixin

    Fluent::Plugin.register_filter('suppress', self)

    config_param :attr_keys,     :string,  :default => nil
    config_param :num,           :integer, :default => 3
    config_param :interval,      :integer, :default => 300

    unless method_defined?(:log)
      define_method("log") { $log }
    end

    def configure(conf)
      super

      if ( !@remove_tag_prefix && !@remove_tag_suffix && !@add_tag_prefix && !@add_tag_suffix )
        raise ConfigError, "out_suppress: Set remove_tag_prefix, remove_tag_suffix, add_tag_prefix or add_tag_suffix."
      end

      @keys  = @attr_keys ? @attr_keys.split(/ *, */) : nil
      @slots = {}
    end

    def filter_stream(tag, es)
      new_es = MultiEventStream.new
      es.each do |time, record|
        key = tag
        if @keys
          keys = @keys.map do |key|
            key.split(/\./).inject(record) {|r, k| r[k] }
          end
          key = tag + "\0" + keys.join("\0")
        end
        slot = @slots[key] ||= []

        # expire old records time
        expired = time.to_f - @interval
        while slot.first && (slot.first <= expired)
          slot.shift
        end

        if slot.length >= @num
          log.debug "suppressed record: #{record.to_json}"
          next
        end

        slot.push(time.to_f)
        _tag = tag.clone
        filter_record(_tag, time, record)
        if tag != _tag
          new_es.add(time, record)
        else
          log.warn "Drop record #{record} tag '#{tag}' was not replaced. Can't filter record, cause infinity looping. Set remove_tag_prefix, remove_tag_suffix, add_tag_prefix or add_tag_suffix correctly."
        end
      end
      new_es
    rescue => e
      log.warn "failed to suppress events", error_class: e.class, error: e.message
      log.warn_backtrace
    end
  end
end

そして設定ファイルは以下のようにした。

<filter suppress>
  type            suppress
  interval        10
  num             2
  attr_keys       host,message
  add_tag_prefix  sp.
</filter>

<match suppress>
  type stdout
</match>

そして、以下のように実行してみた。

# 何度か同じデータを連投してみると2回で表示がとまる
$ echo '{"id":1,"host":"web01","message":"error!!"}' | fluent-cat suppress
$ echo '{"id":1,"host":"web01","message":"error!!"}' | fluent-cat suppress
$ echo '{"id":1,"host":"web01","message":"error!!"}' | fluent-cat suppress
・・・

# ちょっとだけデータを変更するとまた表示される
$ echo '{"id":1,"host":"web02","message":"error!!"}' | fluent-cat suppress
$ echo '{"id":1,"host":"web02","message":"error!!"}' | fluent-cat suppress

感想

ということでInputプラグインやOutputプラグイン同様Filterプラグインも結構簡単に作れそうだなーということがわかりました。

やれることは別にOutputプラグインとそんなに大差ないのだろうと思う。
けど、Input→Output→Output→Outputとやっていくより、Input→Filter→Filter→Outputとやるほうがなんか気持ちいい。
コード的にも役割分担がはっきりして良くなると思う。

Outputの代わりにFilterを使えば、フィルタリングがチェーンするような場合には、パフォーマンスも上々らしいです。

他にも、Labelなんかを使っていろいろOutputやFilterをまとめたりできるようなので、よりログアグリゲーションが捗りそうです。

さてあとはv0.12のリリースを待つばかり。

いつかな、いつかなー?

29
30
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
29
30

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?