1. Qiita
  2. 投稿
  3. fluentd

Java で Fluentd の in_forward っぽいことをするライブラリを作りました #fluentd

  • 6
    いいね
  • 0
    コメント

Java で Fluentd の in_forward っぽいことをするライブラリを作りました

作ったもの

Fluentd Forward Server を実装するための Java ライブラリです。

Influent

Forward とは

Forward はネットワーク経由でイベントを送受信するために用いる、Fluentd 標準の仕組みです。

Fluentd プラグインである in_forward, out_forward がその主な実装で、

各種 Fluentd クライアントもこの仕様に則ってイベント送信機能を実現しています。

Influent はサーバ側の実装であり、Java アプリケーションに Forward リクエスト受信機能を提供します。

使い方

イベント処理の流れ

Influent は以下の流れで Forward リクエストを処理します。

  1. クライアントから送られてくるストリームをパーズして EventStream オブジェクトに変換
  2. EventStream を処理するための ForwardCallback を呼び出す
  3. (リクエストが ACK を要求していれば)ForwardCalback の処理が完了した時点で ACK を返す

ユーザーは2番のコールバック処理を記述するだけで Forward サーバを実装できます。

コールバック処理の実装

EventStream を受け取り CompletableFuture を返すメソッドを持った ForwardCallback を実装します。
クライアント側が at least once オプションを有効にしている場合、この CompletableFuture が完了したタイミングで ack を返します。

CompletableFuture<Void> consume(EventStream stream);

データベースに書き込むためのコールバックを実装する例です。
ofSyncConsumer は、Consumer<EventStream>Executor から ForwardCallbak を生成する static ファクトリーメソッドです。

ForwardCallback callback = ForwardCallback.ofSyncConsumer(
  stream -> {
    String tag = stream.getTag().getName();
    for (EventEntry entry : stream.getEntries()) {
      long time = entry.getTime().toEpochMilli();
      String json = entry.getRecord().toJson();
      eventDB.insert(tag, time, json);
    }
  },
  Executors.newFixedThreadPool(16)
);

サーバの起動

あとは ForwardServer のインスタンスを作成し、start するだけです。

// サーバオブジェクト作成
ForwardServer server = new ForwardServer
  .Builder(callback)
  .build();
// 新しいスレッドでサーバを起動
server.start();

サンプルとして届いたイベントを標準出力に書き込む例を用意しているので、暇な方は fluent-cat コマンドを使って試してみてください。

使い道

なんとなく作ったので、すごくドンピシャリな用途を思いついているわけではありません。
Fluentd には膨大なプラグイン資産や充実した機能があるので、普段はそちらを使うべきでしょう。
しいて言えば、Influent は次のようなシチュエーションで役に立つんじゃないかと思っています。

使用しているミドルウェアにRuby のクライアントライブラリがない場合

新しく誕生したミドルウェアや、マイナーなミドルウェアにデータを格納したいケースです。
Java であればほぼ間違いなくクライアントライブラリが存在するので、手軽に書き込み処理を実装できます。
例えば Yahoo が最近 OSS として公開した Pulsar は、現時点では Java クライアントしか提供されていなかったはずです。

最新ミドルウェアを次々と採用しているような環境では案外小回りがきくかもしれません。

パフォーマンスが必要な場合

Java は単純な処理性能が高く、スレッドや IO 周りの API も充実しています。
なので集計、整形、ミドルウェアへの書き込みといったプラグイン部分まで考慮すると、性能上のアドバンテージを得られるユースケースがありそうです。
Fluentd 自体はよくできてるので、単純な処理だと差は出ないと思います。

現在はまだチューニングをサボっているので、Influent 部分がボトルネックになりそうですが……

やりたいこと

せっかくだし、趣味プロダクトの流量の多い箇所で使ってみようと思っています。
そのために、性能の調査と改善を行なっていく予定です。

実装を後回しにしてる機能もいくつかあるので、関心のある方はコントリビューションをお願いします。

Influent

追記

fluentd.org の Related Projects に載せていただきました。

Influent が Fluentd Related Projects の仲間になりました - おくみん公式ブログ