LoginSignup
5
5

More than 5 years have passed since last update.

fluentdのストリームを簡単に分岐させる(fluent-plugin-zmq-pub)

Last updated at Posted at 2014-03-27

はじめに

fluentdを使っていると、今ストリームを流れているデータを見たかったり、そのデータに対して処理(加工や別fluentdへの送信など)を追加したかったりすることがあります。そんなケースで、fluent-plugin-zmq-pubを使うと便利、という話です。実は一年以上前に書いたプラグインで、自分では便利に使っているのだけど、あまりそのようなことをやっている話を聞かないので書いておきます。

ユースケース

ポイントは、一旦fluentdの中に設定を入れてしまえば、その後実際に分岐させる際にfluentdの再起動が不要、言うことと、分岐先の経路でトラブルがあっても本流には(あまり)影響が無い、ということで、以下の様なケースで使えます.

  • 実験的にストリームを別のfluentdにも流したい. でも、そっちのfluentdは(例えば新しいプラグインを使っていたりとかで)安定性に不安がある.
  • デバッグなどで、今ストリームを流れているデータを見たい(fluent-tailとモチベーションは一緒. fluent-tailの方が楽かも.)
  • ストリームに対して集計などの処理をしたい. (SQLで表現できるような集計であることが分かっているなら、Norikraの方が簡単だと思います.)

やり方

概要

ZeroMQのpub-sub機能を使います. fluentd内でZeroMQのソケットに対して常時publishするようにしておき、必要に応じてsubscriberとなるプロセスを1つまたは複数立ち上げます. subscriberがいてもいなくても、いくつあってもfluentdの設定は変わりません. fluentdを起動したままでsubscriberを追加・停止できます. ZeroMQではなく、他のメッセージキューでも同様のことは可能だと思いますが、ZeroMQだとメッセージキューのために別のプロセスを起動する必要が無いので手軽です.

fluent-plugin-zmq-pubのセットアップ

ZeroMQ本体をインストールしておきます. リポジトリの場所等は公式ドキュメント参照. EPELにも入っています. (ffi-rzmqに移行したい.. ⇒ ) ZeroMQのバージョンは3.2以上が必要です

# yum install zeromq3 zeromq3-devel

fluent-plugin-zmq-pub をインストールします.

# fluent-gem install fluent-plugin-zmq-pub

fluentdの設定を追加します. 分岐したくなりそうなところに、out_copyでzmq_pubへの出力を足しておきます. 自分はほぼ全てのメッセージが通過するmatchタグの中に入れています. pubkeyの部分は${tag}とすればタグが入りますし、好きな固定文字列を入れても構いません. レコード内の値を使うこともできます.

<match anytag.**>
    type copy
    <store>
    # いつもの処理
    </store>
    <store>
    type zmq_pub
    pubkey ${tag}
    bindaddr tcp://*:5556
    flush_interval 1s
        bulk_send true
    </store>
</match>

fluentdで受け取る

fluent-plugin-zmq-pub 0.0.4で、上記でpublishされたレコードをsubscribeするinputプラグインが追加されました. 以下のようにすることで、別のfluentdプロセスからsubscribeすることができます. publisherにはout_zmqがpublishしているアドレスを、subkeyは、subscribeするキーを指定します. カンマ区切りで複数キーを指定することができます. キーは前方一致となります. bulk_sendの指定はpublisher側と合わせてください.

<source>
   type zmq_sub
   publisher tcp://127.0.0.1:5556
   bulk_send true
   subkey zmq.,zmq2.
</source>

これで、2つのfluentdがZeroMQのpub-subで接続されます. publisherはsubscriberの存在を意識しないので、subscriber側が停止していたりハングしたりしていても問題なく動作します. その代わり、その分のレコードはsubscriber側では抜け落ちる形になります. (ある程度はpublisher側で蓄積され、subscriberが復活したら再送されます)

処理を足してみる

例えば、以下の様なrubyスクリプトを用意します. ここでは、sample_sub.rbというファイル名で保存します.ffi-rzmqmsgpackは事前にgem installしておきます.

#!/usr/bin/env ruby

require 'ffi-rzmq'
require 'msgpack'

context = ZMQ::Context.new(1)
subscriber = context.socket(ZMQ::SUB)
subscriber.connect("tcp://localhost:5556")

if ARGV.length > 0
  ARGV.each{|s|
    subscriber.setsockopt(ZMQ::SUBSCRIBE,s)
  }
else
  subscriber.setsockopt(ZMQ::SUBSCRIBE,"")
end

while true
  msg = ''
  while subscriber.recv_string(msg,ZMQ::DONTWAIT) && msg.size > 0
    record =  MessagePack.unpack(msg.split(" ",2)[1])
    puts "tag: #{record[0]}"
    puts "time: #{record[1]}"
    puts "record: #{record[2]}"
    msg = ''
  end
  sleep(0.1)
end

スクリプトを起動します. スクリプトの引数にタグを渡すと、そのタグだけsubscribeします. 引数無しなら全てのタグがsubscribeされます.

% ./sample_sub.rb

別のシェルから、fluent-catでfluentdにレコードを投げてみます

echo '{"key1": "aaa", "key2":"foo"}' | fluent-cat anytag.zmqtest

そうすると、sample_sub.rbを起動したシェルに以下のように出力されます.

tag: anytag.zmqtest
time: 1395154312
record: {"key1"=>"aaa", "key2"=>"foo"}

subscriberの起動有無はfluentdのプロセスには影響を与えません(逆に、subscriberがいなければpublishしたメッセージは消えていきます). 複数起動することもできます. あとは、subscriber内でレコードを加工するなり、レコードをそのまま別のfluentdプロセスにfluent-logger経由で投げるなり好きにできます. ZeroMQのバインディングは各言語用のものが用意されているので、subscriberはruby以外の言語で書くこともできます.

最後に

fluent-plugin-zmq-pubを使って、簡単にfluentdのストリームを分岐させる方法を紹介しました. なお、基本的にsubscriberの動作はfluentd本体に影響を与えないですが、例えばsubscriberの処理がストリームに追いつかない場合など、もしかしたら不具合があるかも知れません. 自分がカジュアルに使っている限りでは問題は出てないですが、お気づきの点があれば教えてください.

5
5
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
5