1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

メモ:Windows上でapache kafka+kafka-node

1
Posted at

分散メッセージキューkafkaそのものについてはここの解説が詳しい。
あとはこれとか。
AWSのAmazonMSKのようなフルマネージドが楽そうだが、まずはWindows上でどんな感じか試してみる。
環境は少し古いWindows Server 2012
node.jsは16.x

kafkaインストール

こことかここを見ながら。

  • とりあえずhttps://kafka.apache.org/downloadsからバイナリダウンロード
  • 7zipで解凍して適当なフォルダに配置
  • zookeeperは組み込みを利用するので、bin\windows\zookeeper-server-start.bat config\zookeeper.propertiesで起動
  • 別のプロンプトを立ち上げてbin\windows\kafka-server-start.bat config/server.properties

起動してみるとjava.nio.file.AccessDeniedExceptionのエラー
config下にあるlog.dirs関係を全てlog.dirs=G:\tmp\kraft-combined-logsのようにWindows形式のパスに変更すると
java.nio.file.InvalidPathException: Illegal char < > at index 2: G: mpkafka-logs\meta.properties.tmp
となったので、\を\でエスケープしてみるがやはりAccessDeniedExceptionエラー。
フォルダの権限の問題でもない。
調べてみると単純に2.12-3.0.0の問題のようなので古いバージョンのkafka_2.12-2.8.1に変更すると設定を変更しなくても動いた。

さらに別のコンソールを開いてトピックを作成して確認
topc名(test)が出てくるはず。
partitionsはとりあえず1で試せばよいが、consumerが複数の場合はそれ以上のpartitionsが存在している必要あり。

bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
bin\windows\kafka-topics.bat --list --zookeeper localhost:2181

なお、削除は
bin\windows\kafka-topics.bat --delete --zookeeper localhost:2181 --topic test

メッセージ送受信(kafka-node)

ダウンロード数を見るとkafkajsやnode-rdkafkaよりkafuka-nodの方が優勢で、
サンプルも多そうなのでkafka-nodeを利用する。

こことかでnode.jsからのアクセス方法が書いてある。

npm install kafka-node

サンプルとは少し変えて、expressでGETで投げたものを適当に直接kafkaに投げてみる

'use strict';
const kafka = require("kafka-node");
const express = require('express');
const app = express();
const port = 3000;
const Producer = kafka.HighLevelProducer;
const Consumer = kafka.Consumer;
const client = new kafka.KafkaClient({
    kafkaHost: "localhost:9092"
});
const producer = new Producer(client, {
	// Partitioner type (default = 0, random = 1, cyclic = 2, keyed = 3, custom = 4), default 0
    partitionerType: 1
});
const consumer = new Consumer(
    client,
    [{topic: "test", partision:0}],
    {
        groupId: "my-consumer",
        autoCommit: true,
        fromOffset: true
    }
);

app.get("/", function(req, res, next){
    const name = req.query.name;
    const age = req.query.age;
    const message = [
        {
            topic: "test",
            messages: JSON.stringify({name: name, age: age})
        }
    ];
    producer.send(message, (err, data) => {
        if (err) console.log(err);
        else console.log('send messages');
        //process.exit();
    });
    res.status(200).send("SEND"); // テストなので常に成功扱い
});

// 受信する場合
consumer.on("message", (message, err) => {
    console.log(message);
});

app.listen(port, () => console.log(`Example app listening on port ${port}!`));

送信出来ているかどうかをコマンドでも確認してみると、飛んできている事がわかる。
--from-beginningを指定すると最初から読み込む。

bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning

受信にはいくつかパターンがあるようで、ここ見ると
new kafka.ConsumerGroupStreamを使うのが一番高機能っぽい。
詳細は公式

SQLを使ってkafkaをコントロールするKSQLのようなものもあるがMSKは対応していなさそう。

マルチConsumer化

起動時に
Error while executing topic command : Replication factor: 1 larger than available brokers: 0.
というようなエラーが出て立ち上がらない事があったが、tmpを全て削除して、アプリも終了させてから起動させると立ち上がった。

tmp等を削除してから再起動して
bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic test
としてpartitionを複数で立ち上げる。

Consumerは先のnode.jsのものからポートを3001に変更したものを作成。
partitionを指定しているため、[{topic: "test", partition:1}],のようにして別のものを指定して起動。
ブラウザから登録してみると別々の受信をしている事が分かる。

とはいえ、これでは自動でpartitionが割り振られないので使いにくい。
ConsumerGroupStreamを使ってグループで割り振られるようにする。
2つ用意するが、ポートの違いのみ。

'use strict';
const kafka = require("kafka-node");
const express = require('express');
const app = express();
const port = 3000;
const Producer = kafka.HighLevelProducer;

const producer = new Producer(client, {
    partitionerType: 1
});

const consumer = new kafka.ConsumerGroupStream({
    kafkaHost: "localhost:9092",
    groupId: 'my-consumer',
    autoCommit: true,
    fromOffset: 'earliest'
  }, 'test')

app.get("/send", function(req, res, next){
    const name = req.query.name;
    const age = req.query.age;
    const message = [
        {
            topic: "test",
            messages: JSON.stringify({name: name, age: age})
        }
    ];
    producer.send(message, (err, data) => {
        if (err) {console.log(err); res.status(500);}
        else console.log('send messages');
        //process.exit();
    });
    res.status(200).send("SEND");
});

// messageではなくdataになる
consumer.on("data", (message, err) => {
    console.log(message);
});

app.listen(port, () => console.log(`Example app listening on port ${port}!`));

これで実験すると各々が異なるデータを受信出来たので概ねOKぽい。

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?