23
22

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

fluentdのforward inputに任意の言語で直接ログを流しこむ方法

Last updated at Posted at 2014-10-27

背景

fluentdは多様なinput/outputプラグインを持ち、またプラグインの作成も容易であるため、自分で作成したプログラムに合わせてinputプラグインを用意できます。しかし、自身でプログラムを作成している場合は出力側の自由がきくので、third partyあるいは自作のプラグインを利用することなく直接forwardで入力する方法を試してみました。(これが本当に最も低コストな入力なのかは未検証です)

fluentd側の設定

forward入力プラグインを設定ファイル(fluentd.confなど)に記述します。

<source>
  type forward
  port 24224
  bind 0.0.0.0
</source>

送信クライアント側の実装

今回はC++で実装しましたが、基本的な手順はどの言語でも共通するはずです。

TCPでforward入力プラグインに接続する

通常のネットワークプログラミングの手順でTCPにソケットで接続する。
C++の場合は以下のとおり。

int connect_tcp(std::string host, std::string port) {
  struct addrinfo hints;
  struct addrinfo *result, *rp;
  int sock;
  int r;

  memset(&hints, 0, sizeof(struct addrinfo));
  hints.ai_family = AF_UNSPEC;
  hints.ai_socktype = SOCK_STREAM;
  hints.ai_flags = 0;
  hints.ai_protocol = 0;

  // IPアドレスを一括で取得
  if (0 != (r = getaddrinfo(host.c_str(), port.c_str(),
                            &hints, &result))) {
    std::string errmsg(gai_strerror(r));
    throw std::runtime_error("getaddrinfo error: " + errmsg);
  }

  // 取得したIPアドレスを順次接続してみる
  for (rp = result; rp != NULL; rp = rp->ai_next) {
    sock = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
    if (sock == -1) {
      // 接続失敗、次へ
      continue;
    }

    if (connect(sock, rp->ai_addr, rp->ai_addrlen) != -1) {
      // 接続成功
      break;
    }
  }
  
  // 結果を解放
  freeaddrinfo(result);
  
  if (rp == NULL) {
    // 接続できるアドレスがなかった
    throw std::runtime_error("no avaiable address for " + host);
  }
  // socket descriptorを返す
  return sock;
}

Msgpackの構造データを作る

こちらも普通にパッキングして、シリアライズできればOK。データ構造を[tag, [[time,record], [time,record], ...]]にするのがポイント。http://documents.mazgi.com/fluentd/doc/plugin.html#forward には[tag, time, record]も許容するとあったが、fluentd-0.10.53では正しく読み込まれなかった。
(2014.11.2 追記) [tag, time, record] でもできました。試行錯誤している段階で何かミスが有ったようです。[tag, time, record] 版のコードをgistに貼っておきます。

以下、C++の場合のコード例。

struct timeval tv;
msgpack::sbuffer buf;
msgpack::packer <msgpack::sbuffer> pk (&buf);

std::string tag("test.tag");
std::string k1("a"), k2("b");
gettimeofday(&tv, NULL);
pk.pack_array(2);   // [_, _]
pk.pack(tag);       // ["test.tag", _]
pk.pack_array(1);   // ["test.tag", [_]]
pk.pack_array(2);   // ["test.tag", [[_, _]]]
pk.pack(tv.tv_sec); // ["test.tag", [[123400xxx, _]]]
pk.pack_map(2);     // ["test.tag", [[123400xxx, {}]]]
pk.pack(k1); pk.pack(1); // ["test.tag", [[123400xxx, {"a": 1}]]]
pk.pack(k2); pk.pack(2); // ["test.tag", [[123400xxx, {"a": 1, "b": 2}]]]

fluentdに送信する

パックしたmsgpack形式のデータをシリアライズしてsocketに書き込む。

write(sock, buf.data(), buf.size());

まとめ

C++のコードはgistにおいてあります。。デバッグが必要なときは標準入力からうけたmsgpackの内容をダンプするコードとかを書いて nc -l 24224 | msgpack_dump.rb みたいにすればすぐ中身が見えるので便利です。

23
22
2

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
23
22

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?