はじめに
kafka-nodeを少し使う機会があったので、備忘録としてまとめます。
ちなみにnode-rdkafkaも試したけど、開発に使ってるWindowsマシンでビルドできなかったので諦めました。
Consumerの種類
Consumerは4種類あって、機能差分はこんな感じ。
とりあえず一番良いのをくれって時は、ConsumerGroupStreamを使えばよさげ。
名前 | Stream機能 | Group機能 |
---|---|---|
Consumer | × | × |
ConsumerStream | ○ | × |
ConsumerGroup | × | ○ |
ConsumerGroupStream | ○ | ○ |
Stream機能
StreamAPIで実装されて、バッファを持ってるらしい。多分効率が良い。
ただのConsumerと違って"message"ではなく、"data"でイベント通知されるので注意。
Group機能
Group機能がないConsumerは、明示的に読み込むパーティションを指定する必要がある。(指定しないとデフォルトの0番パーティションのみ読み込まれて、他のパーティションが読み込まれない)
大抵のユースケースでは「複数パーティション、複数コンシューマでスケールアウトしたら良い感じに自動でパーティション割り当てして欲しい」って感じだと思うので、こちらを選択することになると思われる。
逆に「特定のパーティションは、特定のコンシューマに処理させたい」って要件の場合はGroupじゃないやつを使う。
実際に使ってみる
Kafkaを用意する
dockerでさくっと用意して、パーティション数3のtestトピックに適当にメッセージを入れます。
docker run -p 2181:2181 -p 9092:9092 --env ADVERTISED_HOST=`docker-machine ip \`docker-machine active\`` --env ADVERTISED_PORT=9092 --name test_kafka spotify/kafka
docker exec -it test_kafka bash
cd /opt/kafka*
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic test
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
> message1
> message2
> message3
> message4
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
docker machineのIPを確認する
docker machineで動かしているので、ローカルから接続する時のIPを確認しておきます。
docker-machine ip
# 192.168.99.100
ConsumerGroupStreamで読み込む
mkdir kafka-test
cd kafka-test
npm init -f
npm i kafka-node
touch test.js
const kafka = require('kafka-node')
const consumer = new kafka.ConsumerGroupStream({
kafkaHost: '192.168.99.100:9092',
groupId: 'TestGroup',
fromOffset: 'earliest'
}, 'test')
consumer.on('data', message => {
console.log(message)
})
$ node test.js
{ topic: 'test',
value: 'message3',
offset: 0,
partition: 0,
highWaterOffset: 1,
key: null,
timestamp: 2019-04-07T16:56:45.611Z }
{ topic: 'test',
value: 'message2',
offset: 0,
partition: 1,
highWaterOffset: 1,
key: null,
timestamp: 2019-04-07T16:56:42.667Z }
{ topic: 'test',
value: 'message1',
offset: 0,
partition: 2,
highWaterOffset: 2,
key: null,
timestamp: 2019-04-07T16:56:40.005Z }
{ topic: 'test',
value: 'message4',
offset: 1,
partition: 2,
highWaterOffset: 2,
key: null,
timestamp: 2019-04-07T16:56:48.987Z }
以上です。