LoginSignup
4
3

More than 1 year has passed since last update.

Debezium - データソース (DBなど) の変更を検知して Consumer で受け取る

Last updated at Posted at 2022-01-08

概要

前回の続きです。
今回は Consumer で変更内容を受け取って、メッセージを出力するところまでのメモです。

docker-compose

何度も起動するのが面倒なので docker-compose にしました。
前回のチュートリアルで docker run していた内容を docker-compose に書き直した感じです。

ポイント

  • debezium_network という名前の Docker ネットワークにすべて配置します。
  • version がちょっと古いのは会社の docker のバージョンがちょっと古いからです。v3 を使用すると network 周りの書き方などがちょっと変わるみたいです。
docker-compose.yaml
version: "2.4"
services:
  zookeeper:
    container_name: "zookeeper"
    image: "debezium/zookeeper:1.8"
    networks:
      - "debezium_network"
    ports:
      - "2181:2181"
      - "2888:2888"
      - "3888:3888"

  kafka:
    container_name: "kafka"
    image: "debezium/kafka:1.8"
    depends_on:
      - "zookeeper"
    networks:
      - "debezium_network"
    ports:
      - "9092:9092"
    environment:
      - "ZOOKEEPER_CONNECT=zookeeper:2181"

  mysql:
    container_name: "mysql"
    image: "debezium/example-mysql:1.8"
    networks:
      - "debezium_network"
    ports:
      - "3306:3306"
    environment:
      - "MYSQL_ROOT_PASSWORD=debezium"
      - "MYSQL_USER=mysqluser"
      - "MYSQL_PASSWORD=mysqlpw"

  connect:
    container_name: "connect"
    image: "debezium/connect:1.8"
    depends_on:
      - "zookeeper"
      - "kafka"
      - "mysql"
    networks:
      - "debezium_network"
    ports:
      - "8083:8083"
    environment:
      - "BOOTSTRAP_SERVERS=kafka:9092"
      - "GROUP_ID=1"
      - "CONFIG_STORAGE_TOPIC=my_connect_configs"
      - "OFFSET_STORAGE_TOPIC=my_connect_offsets"
      - "STATUS_STORAGE_TOPIC=my_connect_statuses"

networks:
  default:
    external:
      name: "bridge"
  debezium_network:
    name: debezium_network
    driver: "bridge"

起動後の KafkaConnect へのDebeziumMySQLコネクタの登録 はとりあえず手動のままで

$ curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "database.server.id": "184054", "database.server.name": "dbserver1", "database.include.list": "inventory", "database.history.kafka.bootstrap.servers": "kafka:9092", "database.history.kafka.topic": "dbhistory.inventory" } }'

余談

立ち上げたコンテナを一括で終了させるには、滅びの呪文を使うと楽です。

Consumer 準備

データソースに変更が加わったら、 debezium が検知して kafka に変更内容をキューとして登録します。
そのキューに登録された内容を使用して何らかの処理をおこないます。

今回は手っ取り早く、 nodejs でキューが登録されたら、その内容をコンソールに表示するだけのアプリケーションを作ります。

kafka-node というパッケージを使用しました。

package.json
{
  "name": "test-kafka-node",
  "version": "1.0.0",
  "description": "",
  "main": "index.ts",
  "scripts": {
    "start": "ts-node index.ts"
  },
  "author": "",
  "license": "ISC",
  "dependencies": {
    "kafka-node": "^5.0.0",
    "ts-node": "^10.4.0"
  },
  "devDependencies": {
    "@types/kafka-node": "^3.0.0"
  }
}

index.ts
import kafka from "kafka-node";

const client = new kafka.KafkaClient({
  kafkaHost: "kafka:9092",
});
const consumer = new kafka.Consumer(
  client,
  [{ topic: "dbserver1.inventory.customers"}], // ★customers テーブルに変更を加える予定
  {
    autoCommit: true,
    fromOffset: true,
    groupId: "consumer1",
  }
);

consumer.on("message", function (message) {
  console.log(message);
});

console.log("ready.");

Dockerfile も作成しておきます。
こちらからほぼ丸パクリさせていただきました。

# OS: Debian Buster
# Node.js: 14.4.0
FROM node:14.4.0-buster

# Create app directory
WORKDIR /usr/src/app

# Install app dependencies (package.json and package-lock.json)
COPY package*.json ./
RUN npm install

# Bundle app source (server.js)
COPY . .

# Listen port
EXPOSE 8080

# Run Node.js
CMD [ "npm", "run", "start" ]

Consumer 起動

同じネットワーク内で起動させたいので --network debezium_network を指定します。

$ docker image build -t test-kafka-node:latest .
$ docker run -it --rm --name consumer --network debezium_network test-kafka-node

変更を加える

今回は mysql のコンテナに直接入って、Update文を実行してみます。

$ docker exec -it mysql bash
root@d8382f3c83e8:/# mysql -uroot -pdebezium
mysql: [Warning] Using a password on the command line interface can be insecure.
Welcome to the MySQL monitor.  Commands end with ; or \g.
Your MySQL connection id is 8
Server version: 8.0.27 MySQL Community Server - GPL

Copyright (c) 2000, 2021, Oracle and/or its affiliates.

Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.

Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.

mysql> use inventory;
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A

Database changed
mysql> UPDATE customers SET first_name='Anne99999' WHERE id=1004;
Query OK, 1 row affected (0.00 sec)
Rows matched: 1  Changed: 1  Warnings: 0

上記のように customers テーブルに変更を加えると、

  1. customers テーブルに変更が入る
  2. debezium により kafka にキューが登録される
  3. kafka に入ったキューを nodejs の consumer が拾って console.log(message) が実行されます。

実際に cunsumer のコンテナで message が確認できます。
message から変更内容が取得できるので、その内容をもとに Lambda を叩くなり、リードモデルにデータを挿入したり、メール送ったりと、後はどうにでもなりそうです。

4
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
3