背景
fluentdは多様なinput/outputプラグインを持ち、またプラグインの作成も容易であるため、自分で作成したプログラムに合わせてinputプラグインを用意できます。しかし、自身でプログラムを作成している場合は出力側の自由がきくので、third partyあるいは自作のプラグインを利用することなく直接forwardで入力する方法を試してみました。(これが本当に最も低コストな入力なのかは未検証です)
fluentd側の設定
forward入力プラグインを設定ファイル(fluentd.confなど)に記述します。
<source>
type forward
port 24224
bind 0.0.0.0
</source>
送信クライアント側の実装
今回はC++で実装しましたが、基本的な手順はどの言語でも共通するはずです。
TCPでforward入力プラグインに接続する
通常のネットワークプログラミングの手順でTCPにソケットで接続する。
C++の場合は以下のとおり。
int connect_tcp(std::string host, std::string port) {
struct addrinfo hints;
struct addrinfo *result, *rp;
int sock;
int r;
memset(&hints, 0, sizeof(struct addrinfo));
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
hints.ai_flags = 0;
hints.ai_protocol = 0;
// IPアドレスを一括で取得
if (0 != (r = getaddrinfo(host.c_str(), port.c_str(),
&hints, &result))) {
std::string errmsg(gai_strerror(r));
throw std::runtime_error("getaddrinfo error: " + errmsg);
}
// 取得したIPアドレスを順次接続してみる
for (rp = result; rp != NULL; rp = rp->ai_next) {
sock = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
if (sock == -1) {
// 接続失敗、次へ
continue;
}
if (connect(sock, rp->ai_addr, rp->ai_addrlen) != -1) {
// 接続成功
break;
}
}
// 結果を解放
freeaddrinfo(result);
if (rp == NULL) {
// 接続できるアドレスがなかった
throw std::runtime_error("no avaiable address for " + host);
}
// socket descriptorを返す
return sock;
}
Msgpackの構造データを作る
こちらも普通にパッキングして、シリアライズできればOK。データ構造を[tag, [[time,record], [time,record], ...]]
にするのがポイント。http://documents.mazgi.com/fluentd/doc/plugin.html#forward には[tag, time, record]も許容するとあったが、fluentd-0.10.53では正しく読み込まれなかった。
(2014.11.2 追記) [tag, time, record]
でもできました。試行錯誤している段階で何かミスが有ったようです。[tag, time, record]
版のコードをgistに貼っておきます。
以下、C++の場合のコード例。
struct timeval tv;
msgpack::sbuffer buf;
msgpack::packer <msgpack::sbuffer> pk (&buf);
std::string tag("test.tag");
std::string k1("a"), k2("b");
gettimeofday(&tv, NULL);
pk.pack_array(2); // [_, _]
pk.pack(tag); // ["test.tag", _]
pk.pack_array(1); // ["test.tag", [_]]
pk.pack_array(2); // ["test.tag", [[_, _]]]
pk.pack(tv.tv_sec); // ["test.tag", [[123400xxx, _]]]
pk.pack_map(2); // ["test.tag", [[123400xxx, {}]]]
pk.pack(k1); pk.pack(1); // ["test.tag", [[123400xxx, {"a": 1}]]]
pk.pack(k2); pk.pack(2); // ["test.tag", [[123400xxx, {"a": 1, "b": 2}]]]
fluentdに送信する
パックしたmsgpack形式のデータをシリアライズしてsocketに書き込む。
write(sock, buf.data(), buf.size());
まとめ
C++のコードはgistにおいてあります。。デバッグが必要なときは標準入力からうけたmsgpackの内容をダンプするコードとかを書いて nc -l 24224 | msgpack_dump.rb
みたいにすればすぐ中身が見えるので便利です。