LoginSignup
29
26

More than 5 years have passed since last update.

Kafka Consumer for Fluentd

Last updated at Posted at 2015-09-18

treasure-data/kafka-fluentd-consumer

Kafkaからデータを取り出してFluentdに転送するJava実装のKafka Consumerを書きました.基本的なことはREADMEに書いてるんですが,一応日本語でも書いておきます.

背景

1トピック1パーティションであれば,すでに公開されているfluent-plugin-kafkaが使えます.ですが,複数トピックだったりをConsumer Groupを使って処理しようとすると,現状fluent-plugin-kafkaが依存しているPoseidonに機能がたりないという問題があります(output側は問題ありません).

海外だとデータ量が多い会社も多く,それに伴いKafkaを使っている会社が少しずつ増えてます.その影響もあり,FluentdとKafkaを連携させたいという要望がちょくちょく出てきたので,Java実装のConsumerを作ったという流れになります.
これで,Fluentdの色々なプラグインも再利用できるようになります.

実装

Kafkaが提供しているHigh Level Consumer APIを使っています.現在リリースしているv0.1.0だと以下の機能があります.

  • Kafkaに入っているJSONのイベントをFluentdに転送
    • tagは固定と,prefix + topicの二つを指定可能
  • 複数トピックをJavaの正規表現を使って指定可能
  • ログはlog4jを利用

みたいな感じで基本的なConsumer実装になります.fluentdにforwardプロトコルを使って転送するので,fluentd側にin_forwardの設定が必要です.

使い方

ビルドはgradleを使っているのでソースを落として来てビルドするか,githubのリリースページでjarも配布しているので,それをダウンロードします.その後は,javaでjarを指定し,第一引数にpropertiesの設定ファイルを渡すだけです.
設定ファイルのサンプルはリポジトリのconfigディレクトリにあるので,これを利用すると早いです.

$ java -Dlog4j.configuration=file://path/to/log4j.properties -jar build/libs/kafka-fluentd-consumer-0.1.0-all.jar /path/to/fluentd-consumer.properties

あるユーザさんはin_execを使ってfluentdから呼び出してるようです.

<source>
  @type forward
</source>
<source>
  @type exec
  tag dummy
  command java -Dlog4j.configuration=file:///path/to/log4j.properties -jar /path/to/kafka-fluentd-consumer-0.1.0-all.jar /path/to/fluentd-consumer.properties
  format json
</source>
<match kafka.**>
  @type stdout
</match>

まとめ

もしKafkaを使っているのであれば,是非試してフィードバックを貰えればなぁという感じです.いくつかのTODOはREADMEに書いてますが,これらはおいおい実装します.あと,今はTreasure Dataリポジトリの下にありますが,別にTD向けの機能とかないので,いずれfluentに移す予定です.

あと,久しぶりにJavaとか書いたので「こう書いた方がいい」みたいなものがあれば,PRしてもらえると助かります.

29
26
3

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
29
26