Advent Calendar書くネタなんてなかったからどうしようかマジ焦った^^;
背景
apacheのエラーログなんかをfluentdに投げて、チャットツールに流したりしてる。
そんなことしていると、余計なデータがいっぱいチャットログに出るのでフィルタリングしたいなーって思ってた。
とりあえずfluentdのプラグインをいろいろ組み合わせればやれそうだなーと思っていた。
実際fluent-plugin-grepやfluent-plugin-hipchatとか組み合わせてやってみてできたのだけど、
痒いところに手が届かなかった。
そこでoutputプラグインを自作してそれを使っていた。
ただ自作のプラグインは、目的は達成できたものの、もろもろの条件などをベタ書きした糞コード。
全然イカしてない・・・・・・
なんとかならんのか・・・・
そんなときに神のお告げをきいた:Fluentd v0.12でのFilterとLabel
そして幸いにもその神のお告げは、すでにmasterにマージされているらしかった!!
・・・・ということで早速使ってみた。
爆速で仕様が変わっていた
上のページの通り設定を書いて使おうとしてみたらエラーで起動しなかった><
どうやら設定ファイルの書き方がかわっていたようである。
いろいろ調べたところ、最新の設定ファイルはexampleフォルダ以下にあると判明
filterのサンプル設定ファイルを読んでみる
v0_12_filter.confっていうファイルがあったので多分これだろうと思って読んでみた。
例が3つあるようだ
<source>
type forward
port 24224
</source>
# 例1:正規表現にマッチするレコードのみ通す
<filter foo>
type grep
regexp1 message keep this
</filter>
<match foo>
type stdout
</match>
# 例2:対象レコードにデータ(ホスト名)を追加
<filter bar>
type record_transformer
<record>
hostname ${hostname}
</record>
</filter>
<match bar>
type forward
<server>
host 123.4.2.4
port 24225
</server>
</match>
# 例3:除外条件(helloデータを持つ)にマッチしないレコードを通して、データをdowncase化
<filter foo.bar>
type grep
exclude1 hello .
</filter>
<filter foo.bar>
type record_transformer
enable_ruby true
<record>
name ${name.downcase}
</record>
</filter>
<match foo.bar>
type stdout
</match>
ふむふむ、確かに書き方変わっとる。
でももうこれでわかったので後は試してみよう。
filterプラグインを試してみる
まずは設定ファイルは上のものを流用してみた。
exampleの設定ファイルを一部書き換え
# まずはリポジトリ落としてきて事前準備
$ git clone git@github.com:fluent/fluentd.git fluentd-filter-test
$ cd fluentd-filter-test
$ bundle install
# サンプル設定ファイルをコピってくる
$ cp example/v0_12_filter.conf .
# そのまま標準出力するように一部書き換え
$ vim v0_12_filter.conf
=========
# こんな感じに書き換えます
# -が削除する行、+が追加する行
<match bar>
- type forward
- <server>
- host 123.4.2.4
- port 24225
- </server>
+ type stdout
</match>
=========
書き換えた設定ファイルを読み込ませてfluentd起動
# -cで設定ファイルを指定できます
$ bin/fluentd -c v0_12_filter.conf
こいつはこのまま起動しっぱなしにしておく
例1を試してみる:単純なgrepフィルタリング(grepフィルタ)
別のターミナルからfluentdにデータを飛ばしてみる
# フィルタリングされない例
$ echo '{"message":"keep this please"}' | fluent-cat foo
2014-12-03 18:43:12 +0900 foo: {"message":"keep this please"}
# フィルタリングされる例(なにも標準出力されない)
$ echo '{"message":"Do not keep"}' | fluent-cat foo
例2を試してみる:レコードにデータを追加(record_transformerフィルタ)
# レコードはなんでもいいので実行
$ echo '{"message":"Do not keep"}' | fluent-cat bar
2014-12-03 18:39:09 +0900 bar: {"key1":"value1","hostname":"mymachine.local"}
# ↑こんな感じでホスト名を追加してくれる
例3を試してみる:除外指定とマッチしたデータをdowncase化
# 除外指定にひっかかる例(なにも標準出力されない)
$ echo '{"name":"SADA", "hello":100}' | fluent-cat foo.bar
# 除外指定にひっかからず、downcaseされる例
$ echo '{"name":"SADA"}' | fluent-cat foo.bar
2014-12-03 18:50:17 +0900 foo.bar: {"name":"sada"}
ちなみにgrepは正規表現を使えるようなので複雑なマッチングもできそうです
ふむふむなかなかおもしろい感じ。
Filterってどうなってるんだろう。
そもそも、Filterってどうなってるんだろと思ってしまった。
Filterプラグインをのぞいてみた。
filter_grep.rbやfilter_record_transformer.rbを覗いてみた。
基本的には, configureとfilter_streamの2メソッドでできてるらしい
特にfilter_streamが本体で、タグとイベントストリームの2引数を取得してフィルターした結果を返すようになっています。
簡単ですね。
module Fluent
class GrepFilter < Filter
Fluent::Plugin.register_filter('grep', self)
〜略〜
def configure(conf)
〜略〜
end
def filter_stream(tag, es)
result_es = MultiEventStream.new
es.each do |time, record|
〜略〜
result_es.add(time, record)
end
result_es
〜略〜
end
end
end
調子にのってFilterプラグインを作ってみた
コードを覗いていると持病のあれがあれで、作りたくなってきてしまった。
でも自作する力量なり発想は元よりないので、自作というより他人のコードを流用させてもらった。
レコードをいい感じに間引いてくれるプラグイン。
ただ、これはoutputプラグインとして書かれているので
これをfilterプラグイン化してみた。
それが以下。
# -*- coding: utf-8 -*-
module Fluent
class SuppressFilter < Filter
include Fluent::HandleTagNameMixin
Fluent::Plugin.register_filter('suppress', self)
config_param :attr_keys, :string, :default => nil
config_param :num, :integer, :default => 3
config_param :interval, :integer, :default => 300
unless method_defined?(:log)
define_method("log") { $log }
end
def configure(conf)
super
if ( !@remove_tag_prefix && !@remove_tag_suffix && !@add_tag_prefix && !@add_tag_suffix )
raise ConfigError, "out_suppress: Set remove_tag_prefix, remove_tag_suffix, add_tag_prefix or add_tag_suffix."
end
@keys = @attr_keys ? @attr_keys.split(/ *, */) : nil
@slots = {}
end
def filter_stream(tag, es)
new_es = MultiEventStream.new
es.each do |time, record|
key = tag
if @keys
keys = @keys.map do |key|
key.split(/\./).inject(record) {|r, k| r[k] }
end
key = tag + "\0" + keys.join("\0")
end
slot = @slots[key] ||= []
# expire old records time
expired = time.to_f - @interval
while slot.first && (slot.first <= expired)
slot.shift
end
if slot.length >= @num
log.debug "suppressed record: #{record.to_json}"
next
end
slot.push(time.to_f)
_tag = tag.clone
filter_record(_tag, time, record)
if tag != _tag
new_es.add(time, record)
else
log.warn "Drop record #{record} tag '#{tag}' was not replaced. Can't filter record, cause infinity looping. Set remove_tag_prefix, remove_tag_suffix, add_tag_prefix or add_tag_suffix correctly."
end
end
new_es
rescue => e
log.warn "failed to suppress events", error_class: e.class, error: e.message
log.warn_backtrace
end
end
end
そして設定ファイルは以下のようにした。
<filter suppress>
type suppress
interval 10
num 2
attr_keys host,message
add_tag_prefix sp.
</filter>
<match suppress>
type stdout
</match>
そして、以下のように実行してみた。
# 何度か同じデータを連投してみると2回で表示がとまる
$ echo '{"id":1,"host":"web01","message":"error!!"}' | fluent-cat suppress
$ echo '{"id":1,"host":"web01","message":"error!!"}' | fluent-cat suppress
$ echo '{"id":1,"host":"web01","message":"error!!"}' | fluent-cat suppress
・・・
# ちょっとだけデータを変更するとまた表示される
$ echo '{"id":1,"host":"web02","message":"error!!"}' | fluent-cat suppress
$ echo '{"id":1,"host":"web02","message":"error!!"}' | fluent-cat suppress
感想
ということでInputプラグインやOutputプラグイン同様Filterプラグインも結構簡単に作れそうだなーということがわかりました。
やれることは別にOutputプラグインとそんなに大差ないのだろうと思う。
けど、Input→Output→Output→Outputとやっていくより、Input→Filter→Filter→Outputとやるほうがなんか気持ちいい。
コード的にも役割分担がはっきりして良くなると思う。
Outputの代わりにFilterを使えば、フィルタリングがチェーンするような場合には、パフォーマンスも上々らしいです。
他にも、Labelなんかを使っていろいろOutputやFilterをまとめたりできるようなので、よりログアグリゲーションが捗りそうです。
さてあとはv0.12のリリースを待つばかり。
いつかな、いつかなー?