treasure-data/kafka-fluentd-consumer
Kafkaからデータを取り出してFluentdに転送するJava実装のKafka Consumerを書きました.基本的なことはREADMEに書いてるんですが,一応日本語でも書いておきます.
背景
1トピック1パーティションであれば,すでに公開されているfluent-plugin-kafkaが使えます.ですが,複数トピックだったりをConsumer Groupを使って処理しようとすると,現状fluent-plugin-kafkaが依存しているPoseidonに機能がたりないという問題があります(output側は問題ありません).
海外だとデータ量が多い会社も多く,それに伴いKafkaを使っている会社が少しずつ増えてます.その影響もあり,FluentdとKafkaを連携させたいという要望がちょくちょく出てきたので,Java実装のConsumerを作ったという流れになります.
これで,Fluentdの色々なプラグインも再利用できるようになります.
実装
Kafkaが提供しているHigh Level Consumer APIを使っています.現在リリースしているv0.1.0だと以下の機能があります.
- Kafkaに入っているJSONのイベントをFluentdに転送
- tagは固定と,
prefix + topic
の二つを指定可能
- tagは固定と,
- 複数トピックをJavaの正規表現を使って指定可能
- ログはlog4jを利用
みたいな感じで基本的なConsumer実装になります.fluentdにforwardプロトコルを使って転送するので,fluentd側にin_forward
の設定が必要です.
使い方
ビルドはgradleを使っているのでソースを落として来てビルドするか,githubのリリースページでjarも配布しているので,それをダウンロードします.その後は,javaでjarを指定し,第一引数にpropertiesの設定ファイルを渡すだけです.
設定ファイルのサンプルはリポジトリのconfigディレクトリにあるので,これを利用すると早いです.
$ java -Dlog4j.configuration=file://path/to/log4j.properties -jar build/libs/kafka-fluentd-consumer-0.1.0-all.jar /path/to/fluentd-consumer.properties
あるユーザさんはin_exec
を使ってfluentdから呼び出してるようです.
<source>
@type forward
</source>
<source>
@type exec
tag dummy
command java -Dlog4j.configuration=file:///path/to/log4j.properties -jar /path/to/kafka-fluentd-consumer-0.1.0-all.jar /path/to/fluentd-consumer.properties
format json
</source>
<match kafka.**>
@type stdout
</match>
まとめ
もしKafkaを使っているのであれば,是非試してフィードバックを貰えればなぁという感じです.いくつかのTODOはREADMEに書いてますが,これらはおいおい実装します.あと,今はTreasure Dataリポジトリの下にありますが,別にTD向けの機能とかないので,いずれfluentに移す予定です.
あと,久しぶりにJavaとか書いたので「こう書いた方がいい」みたいなものがあれば,PRしてもらえると助かります.