Java で Fluentd の in_forward っぽいことをするライブラリを作りました
作ったもの
Fluentd Forward Server
を実装するための Java ライブラリです。
Forward とは
Forward はネットワーク経由でイベントを送受信するために用いる、Fluentd 標準の仕組みです。
Fluentd プラグインである in_forward
, out_forward
がその主な実装で、
各種 Fluentd クライアントもこの仕様に則ってイベント送信機能を実現しています。
Influent
はサーバ側の実装であり、Java アプリケーションに Forward リクエスト受信機能を提供します。
使い方
イベント処理の流れ
Influent
は以下の流れで Forward リクエストを処理します。
- クライアントから送られてくるストリームをパーズして EventStream オブジェクトに変換
-
EventStream
を処理するための ForwardCallback を呼び出す - (リクエストが ACK を要求していれば)
ForwardCalback
の処理が完了した時点で ACK を返す
ユーザーは2番のコールバック処理を記述するだけで Forward サーバを実装できます。
コールバック処理の実装
EventStream
を受け取り CompletableFuture
を返すメソッドを持った ForwardCallback
を実装します。
クライアント側が at least once オプションを有効にしている場合、この CompletableFuture
が完了したタイミングで ack を返します。
CompletableFuture<Void> consume(EventStream stream);
データベースに書き込むためのコールバックを実装する例です。
ofSyncConsumer
は、Consumer<EventStream>
と Executor
から ForwardCallbak
を生成する static ファクトリーメソッドです。
ForwardCallback callback = ForwardCallback.ofSyncConsumer(
stream -> {
String tag = stream.getTag().getName();
for (EventEntry entry : stream.getEntries()) {
long time = entry.getTime().toEpochMilli();
String json = entry.getRecord().toJson();
eventDB.insert(tag, time, json);
}
},
Executors.newFixedThreadPool(16)
);
サーバの起動
あとは ForwardServer
のインスタンスを作成し、start
するだけです。
// サーバオブジェクト作成
ForwardServer server = new ForwardServer
.Builder(callback)
.build();
// 新しいスレッドでサーバを起動
server.start();
サンプルとして届いたイベントを標準出力に書き込む例を用意しているので、暇な方は fluent-cat
コマンドを使って試してみてください。
使い道
なんとなく作ったので、すごくドンピシャリな用途を思いついているわけではありません。
Fluentd には膨大なプラグイン資産や充実した機能があるので、普段はそちらを使うべきでしょう。
しいて言えば、Influent
は次のようなシチュエーションで役に立つんじゃないかと思っています。
使用しているミドルウェアにRuby のクライアントライブラリがない場合
新しく誕生したミドルウェアや、マイナーなミドルウェアにデータを格納したいケースです。
Java であればほぼ間違いなくクライアントライブラリが存在するので、手軽に書き込み処理を実装できます。
例えば Yahoo が最近 OSS として公開した Pulsar は、現時点では Java クライアントしか提供されていなかったはずです。
最新ミドルウェアを次々と採用しているような環境では案外小回りがきくかもしれません。
パフォーマンスが必要な場合
Java は単純な処理性能が高く、スレッドや IO 周りの API も充実しています。
なので集計、整形、ミドルウェアへの書き込みといったプラグイン部分まで考慮すると、性能上のアドバンテージを得られるユースケースがありそうです。
Fluentd 自体はよくできてるので、単純な処理だと差は出ないと思います。
現在はまだチューニングをサボっているので、Influent
部分がボトルネックになりそうですが……
やりたいこと
せっかくだし、趣味プロダクトの流量の多い箇所で使ってみようと思っています。
そのために、性能の調査と改善を行なっていく予定です。
実装を後回しにしてる機能もいくつかあるので、関心のある方はコントリビューションをお願いします。
追記
fluentd.org の Related Projects に載せていただきました。