0. 初めに
先日docker上でしたが、postgres-debezium-kafkaのアーキテクチャを構築し、
DebeziumからKafkaに入るメッセージの中で、operationがcで、lotidが空のものをエラーとして検知するKafka consumerを作成しました。
実際kafkaを初めて触れたので、備忘録的に環境構築と作成手順を記事に残します。
上記docker上で作成しましたが、説明がややこしくなるので記事ではローカル環境で実施します。
前提事項
- 基本的なプログラミング知識
- PostgreSQL, Kafka, Node.jsについての基本的な理解
- 今回はローカル環境を使用する
目次
1. Debeziumとは?
概要
Debeziumは、データベースの変更をキャプチャするためのオープンソースプラットフォームです。
データベースの変更イベントを取得し、Apache Kafkaなどのシステムにストリームする(渡す)ことができます。
ユースケース例
-
データのリアルタイム同期
Debeziumは、例えば、eコマースプラットフォームでの在庫のリアルタイムな同期に利用できます。商品の在庫数が変更されると、Debeziumがこれを検知し、他のサービスやデータベースにリアルタイムで反映できます。
2. Kafkaとは?
概要
正式名称Apache Kafkaは、分散ストリーミングプラットフォームです。
大量のデータをリアルタイムに処理し、アプリケーション間でデータをやり取りすることができます。
全然知らなかったんですが、結構有名とのことで今回勉強になりました。
なぜDebeziumだけでなくKafkaも必要なのか、Kafka Consumerとは?
ここで思うのが、「ん?なんかDebeziumと同じことやってない...?」です。
しかし、明確に2つ使用する意味があります。
Debeziumは、データベースの変更を検知し、それをキャプチャすることができますが、そのデータを効果的に転送、保管、処理するためにはKafkaが必要です。
つまりKafkaを使用することで、大量のデータストリームを効率的に処理し、複数のコンシューマにデータを送信することができます。
3. 全体アーキテクトの俯瞰図
+--------------+ +-----------------+ +-------------+ +----------------+
| PostgreSQL | --> | Debezium | -> | Apache Kafka| --> | Kafka Consumer |
| Database | | Connector | | Broker | | (Node.js) |
+--------------+ +-----------------+ +-------------+ +----------------+
PostgreSQLでのデータ変更は、Debezium Connectorによって検知され、Apache Kafkaに送信されます。
ちなみにKafka Consumerは、Kafkaからメッセージを読み取るアプリケーションです。
Consumerを使用することで、Kafkaにストリームされたデータを利用し、処理することができます。直接Debeziumからデータを読み取ることはできません。
その後、Kafka Consumerがこのデータを読みとっています。
4. 環境構築と設定
手順
- PostgreSQLのインストールと設定
sudo apt update
sudo apt install postgresql postgresql-contrib
sudo -u postgres createuser --interactive
sudo -u postgres createdb [your_database_name]
PostgreSQLデータベースには、以下のようなデータが存在しています。
id,operatorid,lotid,starttrigger,finishtrigger,statusbit
1,operator001,"lot001",t,f,t
2,operator001,"lot002",t,f,t
3,operator003,"lot003",t,f,t
4,operator004,"lot004",t,f,t
-
Debezium Connectorの設定とKafka Connect
DebeziumはKafka Connectを使用していますので、Kafka Connectの設定も必要です。
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "127.0.0.1",
"database.port": "5432",
"database.user": "postgres",
"database.password": "password",
"database.dbname": "your_database_name",
"database.server.name": "dbserver1",
"table.include.list": "schema.table",
"transforms": "route",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "$3"
}
}
-
Apache Kafkaのインストールと設定
sudo apt-get install kafka sudo systemctl start kafka sudo systemctl enable kafka
5. Kafka Consumerの作成
kafka-nodeとは?
kafka-nodeは、Node.jsでApache Kafkaを利用するためのクライアントライブラリです。
このライブラリを使用することで、Node.jsアプリケーションからApache Kafkaへアクセスし、メッセージのプロデュースやコンシュームを行うことができます。
インストール
kafka-nodeはnpmを通じてインストールすることができます。
npm install kafka-node
基本的な使用方法
以下はkafka-nodeを用いて、Kafka Consumerを作成する基本的なコードです。
const kafka = require('kafka-node');
// Kafkaクライアントの作成
const client = new kafka.KafkaClient({kafkaHost: '127.0.0.1:9092'});
// Consumerの作成
const consumer = new kafka.Consumer(
client,
[{ topic: 'your_topic', partition: 0 }],
{ autoCommit: false }
);
// メッセージ受信時の処理
consumer.on('message', function (message) {
console.log(message.value);
});
このコードは、指定されたKafkaクラスタに接続し、指定されたトピックからメッセージを受信し、コンソールにメッセージの内容を出力します。この基本的な流れを理解し、必要に応じてカスタマイズすることで、様々な用途にkafka-nodeを使用することができます。
手順
-
Node.jsプロジェクトの初期化
mkdir kafka-consumer cd kafka-consumer npm init -y
-
Kafka-nodeのインストール
npm install kafka-node
-
Consumerのコーディング
const kafka = require('kafka-node'); const client = new kafka.KafkaClient({kafkaHost: '127.0.0.1:9092'}); const consumer = new kafka.Consumer( client, [{ topic: 'your_topic', partition: 0 }], { autoCommit: false } ); consumer.on('message', function (message) { const msg = JSON.parse(message.value); if (msg.operation === 'c' && !msg.lotid) { console.error('Error: lotid is empty', msg); } });
Kafkaのメッセージ例
Kafkaからは、以下のような形式のメッセージを取得できます。
{
"operation": "c",
"lotid": "123",
"other_field": "some_value"
}