0
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

kafka-nodeのConsumerを使ってみた

Last updated at Posted at 2019-04-07

はじめに

kafka-nodeを少し使う機会があったので、備忘録としてまとめます。
ちなみにnode-rdkafkaも試したけど、開発に使ってるWindowsマシンでビルドできなかったので諦めました。

Consumerの種類

Consumerは4種類あって、機能差分はこんな感じ。
とりあえず一番良いのをくれって時は、ConsumerGroupStreamを使えばよさげ。

名前 Stream機能 Group機能
Consumer × ×
ConsumerStream ×
ConsumerGroup ×
ConsumerGroupStream

Stream機能

StreamAPIで実装されて、バッファを持ってるらしい。多分効率が良い。
ただのConsumerと違って"message"ではなく、"data"でイベント通知されるので注意。

Group機能

Group機能がないConsumerは、明示的に読み込むパーティションを指定する必要がある。(指定しないとデフォルトの0番パーティションのみ読み込まれて、他のパーティションが読み込まれない)
大抵のユースケースでは「複数パーティション、複数コンシューマでスケールアウトしたら良い感じに自動でパーティション割り当てして欲しい」って感じだと思うので、こちらを選択することになると思われる。
逆に「特定のパーティションは、特定のコンシューマに処理させたい」って要件の場合はGroupじゃないやつを使う。

実際に使ってみる

Kafkaを用意する

dockerでさくっと用意して、パーティション数3のtestトピックに適当にメッセージを入れます。

docker run -p 2181:2181 -p 9092:9092 --env ADVERTISED_HOST=`docker-machine ip \`docker-machine active\`` --env ADVERTISED_PORT=9092 --name test_kafka spotify/kafka
docker exec -it test_kafka bash
cd /opt/kafka*
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic test
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
> message1
> message2
> message3
> message4
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

docker machineのIPを確認する

docker machineで動かしているので、ローカルから接続する時のIPを確認しておきます。

docker-machine ip
# 192.168.99.100

ConsumerGroupStreamで読み込む

プロジェクトを作成
mkdir kafka-test
cd kafka-test
npm init -f
npm i kafka-node
touch test.js
test.js
const kafka = require('kafka-node')

const consumer = new kafka.ConsumerGroupStream({
  kafkaHost: '192.168.99.100:9092',
  groupId: 'TestGroup',
  fromOffset: 'earliest'
}, 'test')

consumer.on('data', message => {
  console.log(message)
})
実行結果
$ node test.js
{ topic: 'test',
  value: 'message3',
  offset: 0,
  partition: 0,
  highWaterOffset: 1,
  key: null,
  timestamp: 2019-04-07T16:56:45.611Z }
{ topic: 'test',
  value: 'message2',
  offset: 0,
  partition: 1,
  highWaterOffset: 1,
  key: null,
  timestamp: 2019-04-07T16:56:42.667Z }
{ topic: 'test',
  value: 'message1',
  offset: 0,
  partition: 2,
  highWaterOffset: 2,
  key: null,
  timestamp: 2019-04-07T16:56:40.005Z }
{ topic: 'test',
  value: 'message4',
  offset: 1,
  partition: 2,
  highWaterOffset: 2,
  key: null,
  timestamp: 2019-04-07T16:56:48.987Z }

以上です。

0
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?