Ruby
Fluentd

Fluentd v0.12の目玉機能らしいFilterを試してみた

More than 3 years have passed since last update.

Advent Calendar書くネタなんてなかったからどうしようかマジ焦った^^;


背景

apacheのエラーログなんかをfluentdに投げて、チャットツールに流したりしてる。

そんなことしていると、余計なデータがいっぱいチャットログに出るのでフィルタリングしたいなーって思ってた。

とりあえずfluentdのプラグインをいろいろ組み合わせればやれそうだなーと思っていた。

実際fluent-plugin-grepfluent-plugin-hipchatとか組み合わせてやってみてできたのだけど、

痒いところに手が届かなかった。

そこでoutputプラグインを自作してそれを使っていた。

ただ自作のプラグインは、目的は達成できたものの、もろもろの条件などをベタ書きした糞コード。

全然イカしてない・・・・・・

なんとかならんのか・・・・

そんなときに神のお告げをきいた:Fluentd v0.12でのFilterとLabel

そして幸いにもその神のお告げは、すでにmasterにマージされているらしかった!!

・・・・ということで早速使ってみた。


爆速で仕様が変わっていた

上のページの通り設定を書いて使おうとしてみたらエラーで起動しなかった><

どうやら設定ファイルの書き方がかわっていたようである。

いろいろ調べたところ、最新の設定ファイルはexampleフォルダ以下にあると判明


filterのサンプル設定ファイルを読んでみる

v0_12_filter.confっていうファイルがあったので多分これだろうと思って読んでみた。

例が3つあるようだ

<source>

type forward
port 24224
</source>

# 例1:正規表現にマッチするレコードのみ通す
<filter foo>
type grep
regexp1 message keep this
</filter>
<match foo>
type stdout
</match>

# 例2:対象レコードにデータ(ホスト名)を追加
<filter bar>
type record_transformer
<record>
hostname ${hostname}
</record>
</filter>
<match bar>
type forward
<server>
host 123.4.2.4
port 24225
</server>
</match>

# 例3:除外条件(helloデータを持つ)にマッチしないレコードを通して、データをdowncase化
<filter foo.bar>
type grep
exclude1 hello .
</filter>
<filter foo.bar>
type record_transformer
enable_ruby true
<record>
name ${name.downcase}
</record>
</filter>
<match foo.bar>
type stdout
</match>

ふむふむ、確かに書き方変わっとる。

でももうこれでわかったので後は試してみよう。


filterプラグインを試してみる

まずは設定ファイルは上のものを流用してみた。


exampleの設定ファイルを一部書き換え

# まずはリポジトリ落としてきて事前準備

$ git clone git@github.com:fluent/fluentd.git fluentd-filter-test
$ cd fluentd-filter-test
$ bundle install

# サンプル設定ファイルをコピってくる
$ cp example/v0_12_filter.conf .

# そのまま標準出力するように一部書き換え
$ vim v0_12_filter.conf
=========
# こんな感じに書き換えます
# -が削除する行、+が追加する行
<match bar>
- type forward
- <server>
- host 123.4.2.4
- port 24225
- </server>
+ type stdout
</match>
=========


書き換えた設定ファイルを読み込ませてfluentd起動

# -cで設定ファイルを指定できます

$ bin/fluentd -c v0_12_filter.conf

こいつはこのまま起動しっぱなしにしておく


例1を試してみる:単純なgrepフィルタリング(grepフィルタ)

別のターミナルからfluentdにデータを飛ばしてみる

# フィルタリングされない例

$ echo '{"message":"keep this please"}' | fluent-cat foo

2014-12-03 18:43:12 +0900 foo: {"message":"keep this please"}

# フィルタリングされる例(なにも標準出力されない)
$ echo '{"message":"Do not keep"}' | fluent-cat foo


例2を試してみる:レコードにデータを追加(record_transformerフィルタ)

# レコードはなんでもいいので実行

$ echo '{"message":"Do not keep"}' | fluent-cat bar

2014-12-03 18:39:09 +0900 bar: {"key1":"value1","hostname":"mymachine.local"}
# ↑こんな感じでホスト名を追加してくれる


例3を試してみる:除外指定とマッチしたデータをdowncase化

# 除外指定にひっかかる例(なにも標準出力されない)

$ echo '{"name":"SADA", "hello":100}' | fluent-cat foo.bar

# 除外指定にひっかからず、downcaseされる例
$ echo '{"name":"SADA"}' | fluent-cat foo.bar

2014-12-03 18:50:17 +0900 foo.bar: {"name":"sada"}

ちなみにgrepは正規表現を使えるようなので複雑なマッチングもできそうです

ふむふむなかなかおもしろい感じ。


Filterってどうなってるんだろう。

そもそも、Filterってどうなってるんだろと思ってしまった。


Filterプラグインをのぞいてみた。

filter_grep.rbfilter_record_transformer.rbを覗いてみた。

基本的には, configureとfilter_streamの2メソッドでできてるらしい

特にfilter_streamが本体で、タグとイベントストリームの2引数を取得してフィルターした結果を返すようになっています。

簡単ですね。

module Fluent

class GrepFilter < Filter
Fluent::Plugin.register_filter('grep', self)
〜略〜

def configure(conf)
〜略〜
end

def filter_stream(tag, es)
result_es = MultiEventStream.new
es.each do |time, record|
〜略〜
result_es.add(time, record)
end
result_es
〜略〜
end

end
end


調子にのってFilterプラグインを作ってみた

コードを覗いていると持病のあれがあれで、作りたくなってきてしまった。

でも自作する力量なり発想は元よりないので、自作というより他人のコードを流用させてもらった。

流用元:fluent-plugin-suppress

レコードをいい感じに間引いてくれるプラグイン。

ただ、これはoutputプラグインとして書かれているので

これをfilterプラグイン化してみた。

それが以下。

# -*- coding: utf-8 -*-

module Fluent
class SuppressFilter < Filter
include Fluent::HandleTagNameMixin

Fluent::Plugin.register_filter('suppress', self)

config_param :attr_keys, :string, :default => nil
config_param :num, :integer, :default => 3
config_param :interval, :integer, :default => 300

unless method_defined?(:log)
define_method("log") { $log }
end

def configure(conf)
super

if ( !@remove_tag_prefix && !@remove_tag_suffix && !@add_tag_prefix && !@add_tag_suffix )
raise ConfigError, "out_suppress: Set remove_tag_prefix, remove_tag_suffix, add_tag_prefix or add_tag_suffix."
end

@keys = @attr_keys ? @attr_keys.split(/ *, */) : nil
@slots = {}
end

def filter_stream(tag, es)
new_es = MultiEventStream.new
es.each do |time, record|
key = tag
if @keys
keys = @keys.map do |key|
key.split(/\./).inject(record) {|r, k| r[k] }
end
key = tag + "\0" + keys.join("\0")
end
slot = @slots[key] ||= []

# expire old records time
expired = time.to_f - @interval
while slot.first && (slot.first <= expired)
slot.shift
end

if slot.length >= @num
log.debug "suppressed record: #{record.to_json}"
next
end

slot.push(time.to_f)
_tag = tag.clone
filter_record(_tag, time, record)
if tag != _tag
new_es.add(time, record)
else
log.warn "Drop record #{record} tag '#{tag}' was not replaced. Can't filter record, cause infinity looping. Set remove_tag_prefix, remove_tag_suffix, add_tag_prefix or add_tag_suffix correctly."
end
end
new_es
rescue => e
log.warn "failed to suppress events", error_class: e.class, error: e.message
log.warn_backtrace
end
end
end

そして設定ファイルは以下のようにした。

<filter suppress>

type suppress
interval 10
num 2
attr_keys host,message
add_tag_prefix sp.
</filter>

<match suppress>
type stdout
</match>

そして、以下のように実行してみた。

# 何度か同じデータを連投してみると2回で表示がとまる

$ echo '{"id":1,"host":"web01","message":"error!!"}' | fluent-cat suppress
$ echo '{"id":1,"host":"web01","message":"error!!"}' | fluent-cat suppress
$ echo '{"id":1,"host":"web01","message":"error!!"}' | fluent-cat suppress
・・・

# ちょっとだけデータを変更するとまた表示される
$ echo '{"id":1,"host":"web02","message":"error!!"}' | fluent-cat suppress
$ echo '{"id":1,"host":"web02","message":"error!!"}' | fluent-cat suppress


感想

ということでInputプラグインやOutputプラグイン同様Filterプラグインも結構簡単に作れそうだなーということがわかりました。

やれることは別にOutputプラグインとそんなに大差ないのだろうと思う。

けど、Input→Output→Output→Outputとやっていくより、Input→Filter→Filter→Outputとやるほうがなんか気持ちいい。

コード的にも役割分担がはっきりして良くなると思う。

Outputの代わりにFilterを使えば、フィルタリングがチェーンするような場合には、パフォーマンスも上々らしいです。

他にも、Labelなんかを使っていろいろOutputやFilterをまとめたりできるようなので、よりログアグリゲーションが捗りそうです。

さてあとはv0.12のリリースを待つばかり。

いつかな、いつかなー?