はじめに
fluentdを使っていると、今ストリームを流れているデータを見たかったり、そのデータに対して処理(加工や別fluentdへの送信など)を追加したかったりすることがあります。そんなケースで、fluent-plugin-zmq-pubを使うと便利、という話です。実は一年以上前に書いたプラグインで、自分では便利に使っているのだけど、あまりそのようなことをやっている話を聞かないので書いておきます。
ユースケース
ポイントは、一旦fluentdの中に設定を入れてしまえば、その後実際に分岐させる際にfluentdの再起動が不要、言うことと、分岐先の経路でトラブルがあっても本流には(あまり)影響が無い、ということで、以下の様なケースで使えます.
- 実験的にストリームを別のfluentdにも流したい. でも、そっちのfluentdは(例えば新しいプラグインを使っていたりとかで)安定性に不安がある.
- デバッグなどで、今ストリームを流れているデータを見たい(fluent-tailとモチベーションは一緒. fluent-tailの方が楽かも.)
- ストリームに対して集計などの処理をしたい. (SQLで表現できるような集計であることが分かっているなら、Norikraの方が簡単だと思います.)
やり方
概要
ZeroMQのpub-sub機能を使います. fluentd内でZeroMQのソケットに対して常時publishするようにしておき、必要に応じてsubscriberとなるプロセスを1つまたは複数立ち上げます. subscriberがいてもいなくても、いくつあってもfluentdの設定は変わりません. fluentdを起動したままでsubscriberを追加・停止できます. ZeroMQではなく、他のメッセージキューでも同様のことは可能だと思いますが、ZeroMQだとメッセージキューのために別のプロセスを起動する必要が無いので手軽です.
fluent-plugin-zmq-pubのセットアップ
ZeroMQ本体をインストールしておきます. リポジトリの場所等は公式ドキュメント参照. EPELにも入っています. (ffi-rzmqに移行したい.. ⇒ ) ZeroMQのバージョンは3.2以上が必要です
# yum install zeromq3 zeromq3-devel
fluent-plugin-zmq-pub をインストールします.
# fluent-gem install fluent-plugin-zmq-pub
fluentdの設定を追加します. 分岐したくなりそうなところに、out_copyでzmq_pubへの出力を足しておきます. 自分はほぼ全てのメッセージが通過するmatchタグの中に入れています. pubkey
の部分は${tag}
とすればタグが入りますし、好きな固定文字列を入れても構いません. レコード内の値を使うこともできます.
<match anytag.**>
type copy
<store>
# いつもの処理
</store>
<store>
type zmq_pub
pubkey ${tag}
bindaddr tcp://*:5556
flush_interval 1s
bulk_send true
</store>
</match>
fluentdで受け取る
fluent-plugin-zmq-pub 0.0.4で、上記でpublishされたレコードをsubscribeするinputプラグインが追加されました. 以下のようにすることで、別のfluentdプロセスからsubscribeすることができます. publisher
にはout_zmqがpublishしているアドレスを、subkey
は、subscribeするキーを指定します. カンマ区切りで複数キーを指定することができます. キーは前方一致となります. bulk_send
の指定はpublisher側と合わせてください.
<source>
type zmq_sub
publisher tcp://127.0.0.1:5556
bulk_send true
subkey zmq.,zmq2.
</source>
これで、2つのfluentdがZeroMQのpub-subで接続されます. publisherはsubscriberの存在を意識しないので、subscriber側が停止していたりハングしたりしていても問題なく動作します. その代わり、その分のレコードはsubscriber側では抜け落ちる形になります. (ある程度はpublisher側で蓄積され、subscriberが復活したら再送されます)
処理を足してみる
例えば、以下の様なrubyスクリプトを用意します. ここでは、sample_sub.rb
というファイル名で保存します.ffi-rzmq
やmsgpack
は事前にgem install
しておきます.
#!/usr/bin/env ruby
require 'ffi-rzmq'
require 'msgpack'
context = ZMQ::Context.new(1)
subscriber = context.socket(ZMQ::SUB)
subscriber.connect("tcp://localhost:5556")
if ARGV.length > 0
ARGV.each{|s|
subscriber.setsockopt(ZMQ::SUBSCRIBE,s)
}
else
subscriber.setsockopt(ZMQ::SUBSCRIBE,"")
end
while true
msg = ''
while subscriber.recv_string(msg,ZMQ::DONTWAIT) && msg.size > 0
record = MessagePack.unpack(msg.split(" ",2)[1])
puts "tag: #{record[0]}"
puts "time: #{record[1]}"
puts "record: #{record[2]}"
msg = ''
end
sleep(0.1)
end
スクリプトを起動します. スクリプトの引数にタグを渡すと、そのタグだけsubscribeします. 引数無しなら全てのタグがsubscribeされます.
% ./sample_sub.rb
別のシェルから、fluent-catでfluentdにレコードを投げてみます
echo '{"key1": "aaa", "key2":"foo"}' | fluent-cat anytag.zmqtest
そうすると、sample_sub.rb
を起動したシェルに以下のように出力されます.
tag: anytag.zmqtest
time: 1395154312
record: {"key1"=>"aaa", "key2"=>"foo"}
subscriberの起動有無はfluentdのプロセスには影響を与えません(逆に、subscriberがいなければpublishしたメッセージは消えていきます). 複数起動することもできます. あとは、subscriber内でレコードを加工するなり、レコードをそのまま別のfluentdプロセスにfluent-logger経由で投げるなり好きにできます. ZeroMQのバインディングは各言語用のものが用意されているので、subscriberはruby以外の言語で書くこともできます.
最後に
fluent-plugin-zmq-pubを使って、簡単にfluentdのストリームを分岐させる方法を紹介しました. なお、基本的にsubscriberの動作はfluentd本体に影響を与えないですが、例えばsubscriberの処理がストリームに追いつかない場合など、もしかしたら不具合があるかも知れません. 自分がカジュアルに使っている限りでは問題は出てないですが、お気づきの点があれば教えてください.