LoginSignup
0
0

【Node.js】Kafka Consumerを作成してみた

Last updated at Posted at 2023-09-27

0. 初めに

先日docker上でしたが、postgres-debezium-kafkaのアーキテクチャを構築し、
DebeziumからKafkaに入るメッセージの中で、operationがcで、lotidが空のものをエラーとして検知するKafka consumerを作成しました。
実際kafkaを初めて触れたので、備忘録的に環境構築と作成手順を記事に残します。

上記docker上で作成しましたが、説明がややこしくなるので記事ではローカル環境で実施します。

前提事項

  • 基本的なプログラミング知識
  • PostgreSQL, Kafka, Node.jsについての基本的な理解
  • 今回はローカル環境を使用する

目次

  1. Debeziumとは?
  2. Kafkaとは?
  3. 全体アーキテクトの俯瞰図
  4. 環境構築と設定
  5. Kafka Consumerの作成
  6. 参考

1. Debeziumとは?

概要

Debeziumは、データベースの変更をキャプチャするためのオープンソースプラットフォームです。
データベースの変更イベントを取得し、Apache Kafkaなどのシステムにストリームする(渡す)ことができます。

ユースケース例

  • データのリアルタイム同期
    Debeziumは、例えば、eコマースプラットフォームでの在庫のリアルタイムな同期に利用できます。商品の在庫数が変更されると、Debeziumがこれを検知し、他のサービスやデータベースにリアルタイムで反映できます。

2. Kafkaとは?

概要

正式名称Apache Kafkaは、分散ストリーミングプラットフォームです。
大量のデータをリアルタイムに処理し、アプリケーション間でデータをやり取りすることができます。
全然知らなかったんですが、結構有名とのことで今回勉強になりました。

なぜDebeziumだけでなくKafkaも必要なのか、Kafka Consumerとは?

ここで思うのが、「ん?なんかDebeziumと同じことやってない...?」です。
しかし、明確に2つ使用する意味があります。

Debeziumは、データベースの変更を検知し、それをキャプチャすることができますが、そのデータを効果的に転送、保管、処理するためにはKafkaが必要です。
つまりKafkaを使用することで、大量のデータストリームを効率的に処理し、複数のコンシューマにデータを送信することができます。

3. 全体アーキテクトの俯瞰図

+--------------+     +-----------------+    +-------------+     +----------------+
| PostgreSQL   | --> | Debezium        | -> | Apache Kafka| --> | Kafka Consumer |
| Database     |     | Connector       |    | Broker      |     | (Node.js)      |
+--------------+     +-----------------+    +-------------+     +----------------+

PostgreSQLでのデータ変更は、Debezium Connectorによって検知され、Apache Kafkaに送信されます。

ちなみにKafka Consumerは、Kafkaからメッセージを読み取るアプリケーションです。
Consumerを使用することで、Kafkaにストリームされたデータを利用し、処理することができます。直接Debeziumからデータを読み取ることはできません。

その後、Kafka Consumerがこのデータを読みとっています。

4. 環境構築と設定

手順

  1. PostgreSQLのインストールと設定
   sudo apt update
   sudo apt install postgresql postgresql-contrib
   sudo -u postgres createuser --interactive
   sudo -u postgres createdb [your_database_name]

PostgreSQLデータベースには、以下のようなデータが存在しています。

id,operatorid,lotid,starttrigger,finishtrigger,statusbit
1,operator001,"lot001",t,f,t
2,operator001,"lot002",t,f,t
3,operator003,"lot003",t,f,t
4,operator004,"lot004",t,f,t
  1. Debezium Connectorの設定とKafka Connect
    DebeziumはKafka Connectを使用していますので、Kafka Connectの設定も必要です。
{
  "name": "inventory-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "127.0.0.1",
    "database.port": "5432",
    "database.user": "postgres",
    "database.password": "password",
    "database.dbname": "your_database_name",
    "database.server.name": "dbserver1",
    "table.include.list": "schema.table",
    "transforms": "route",
    "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
    "transforms.route.replacement": "$3"
  }
}
  1. Apache Kafkaのインストールと設定
    sudo apt-get install kafka
    sudo systemctl start kafka
    sudo systemctl enable kafka
    

5. Kafka Consumerの作成

kafka-nodeとは?

kafka-nodeは、Node.jsでApache Kafkaを利用するためのクライアントライブラリです。
このライブラリを使用することで、Node.jsアプリケーションからApache Kafkaへアクセスし、メッセージのプロデュースやコンシュームを行うことができます。

インストール

kafka-nodeはnpmを通じてインストールすることができます。

npm install kafka-node

基本的な使用方法

以下はkafka-nodeを用いて、Kafka Consumerを作成する基本的なコードです。

const kafka = require('kafka-node');

// Kafkaクライアントの作成
const client = new kafka.KafkaClient({kafkaHost: '127.0.0.1:9092'});

// Consumerの作成
const consumer = new kafka.Consumer(
  client,
  [{ topic: 'your_topic', partition: 0 }],
  { autoCommit: false }
);

// メッセージ受信時の処理
consumer.on('message', function (message) {
  console.log(message.value);
});

このコードは、指定されたKafkaクラスタに接続し、指定されたトピックからメッセージを受信し、コンソールにメッセージの内容を出力します。この基本的な流れを理解し、必要に応じてカスタマイズすることで、様々な用途にkafka-nodeを使用することができます。

手順

  1. Node.jsプロジェクトの初期化

    mkdir kafka-consumer
    cd kafka-consumer
    npm init -y
    
  2. Kafka-nodeのインストール

    npm install kafka-node
    
  3. Consumerのコーディング

    const kafka = require('kafka-node');
    const client = new kafka.KafkaClient({kafkaHost: '127.0.0.1:9092'});
    const consumer = new kafka.Consumer(
      client,
      [{ topic: 'your_topic', partition: 0 }],
      { autoCommit: false }
    );
    
    consumer.on('message', function (message) {
      const msg = JSON.parse(message.value);
      if (msg.operation === 'c' && !msg.lotid) {
        console.error('Error: lotid is empty', msg);
      }
    });
    

Kafkaのメッセージ例

Kafkaからは、以下のような形式のメッセージを取得できます。

{
  "operation": "c",
  "lotid": "123",
  "other_field": "some_value"
}

6. 参考

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0