概要
前回の続きです。
今回は Consumer で変更内容を受け取って、メッセージを出力するところまでのメモです。
docker-compose
何度も起動するのが面倒なので docker-compose にしました。
前回のチュートリアルで docker run
していた内容を docker-compose に書き直した感じです。
ポイント
-
debezium_network
という名前の Docker ネットワークにすべて配置します。 - version がちょっと古いのは会社の docker のバージョンがちょっと古いからです。v3 を使用すると network 周りの書き方などがちょっと変わるみたいです。
version: "2.4"
services:
zookeeper:
container_name: "zookeeper"
image: "debezium/zookeeper:1.8"
networks:
- "debezium_network"
ports:
- "2181:2181"
- "2888:2888"
- "3888:3888"
kafka:
container_name: "kafka"
image: "debezium/kafka:1.8"
depends_on:
- "zookeeper"
networks:
- "debezium_network"
ports:
- "9092:9092"
environment:
- "ZOOKEEPER_CONNECT=zookeeper:2181"
mysql:
container_name: "mysql"
image: "debezium/example-mysql:1.8"
networks:
- "debezium_network"
ports:
- "3306:3306"
environment:
- "MYSQL_ROOT_PASSWORD=debezium"
- "MYSQL_USER=mysqluser"
- "MYSQL_PASSWORD=mysqlpw"
connect:
container_name: "connect"
image: "debezium/connect:1.8"
depends_on:
- "zookeeper"
- "kafka"
- "mysql"
networks:
- "debezium_network"
ports:
- "8083:8083"
environment:
- "BOOTSTRAP_SERVERS=kafka:9092"
- "GROUP_ID=1"
- "CONFIG_STORAGE_TOPIC=my_connect_configs"
- "OFFSET_STORAGE_TOPIC=my_connect_offsets"
- "STATUS_STORAGE_TOPIC=my_connect_statuses"
networks:
default:
external:
name: "bridge"
debezium_network:
name: debezium_network
driver: "bridge"
起動後の KafkaConnect へのDebeziumMySQLコネクタの登録 はとりあえず手動のままで
$ curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "database.server.id": "184054", "database.server.name": "dbserver1", "database.include.list": "inventory", "database.history.kafka.bootstrap.servers": "kafka:9092", "database.history.kafka.topic": "dbhistory.inventory" } }'
余談
立ち上げたコンテナを一括で終了させるには、滅びの呪文を使うと楽です。
Consumer 準備
データソースに変更が加わったら、 debezium が検知して kafka に変更内容をキューとして登録します。
そのキューに登録された内容を使用して何らかの処理をおこないます。
今回は手っ取り早く、 nodejs でキューが登録されたら、その内容をコンソールに表示するだけのアプリケーションを作ります。
kafka-node というパッケージを使用しました。
{
"name": "test-kafka-node",
"version": "1.0.0",
"description": "",
"main": "index.ts",
"scripts": {
"start": "ts-node index.ts"
},
"author": "",
"license": "ISC",
"dependencies": {
"kafka-node": "^5.0.0",
"ts-node": "^10.4.0"
},
"devDependencies": {
"@types/kafka-node": "^3.0.0"
}
}
import kafka from "kafka-node";
const client = new kafka.KafkaClient({
kafkaHost: "kafka:9092",
});
const consumer = new kafka.Consumer(
client,
[{ topic: "dbserver1.inventory.customers"}], // ★customers テーブルに変更を加える予定
{
autoCommit: true,
fromOffset: true,
groupId: "consumer1",
}
);
consumer.on("message", function (message) {
console.log(message);
});
console.log("ready.");
Dockerfile も作成しておきます。
こちらからほぼ丸パクリさせていただきました。
# OS: Debian Buster
# Node.js: 14.4.0
FROM node:14.4.0-buster
# Create app directory
WORKDIR /usr/src/app
# Install app dependencies (package.json and package-lock.json)
COPY package*.json ./
RUN npm install
# Bundle app source (server.js)
COPY . .
# Listen port
EXPOSE 8080
# Run Node.js
CMD [ "npm", "run", "start" ]
Consumer 起動
同じネットワーク内で起動させたいので --network debezium_network
を指定します。
$ docker image build -t test-kafka-node:latest .
$ docker run -it --rm --name consumer --network debezium_network test-kafka-node
変更を加える
今回は mysql のコンテナに直接入って、Update文を実行してみます。
$ docker exec -it mysql bash
root@d8382f3c83e8:/# mysql -uroot -pdebezium
mysql: [Warning] Using a password on the command line interface can be insecure.
Welcome to the MySQL monitor. Commands end with ; or \g.
Your MySQL connection id is 8
Server version: 8.0.27 MySQL Community Server - GPL
Copyright (c) 2000, 2021, Oracle and/or its affiliates.
Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.
Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.
mysql> use inventory;
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A
Database changed
mysql> UPDATE customers SET first_name='Anne99999' WHERE id=1004;
Query OK, 1 row affected (0.00 sec)
Rows matched: 1 Changed: 1 Warnings: 0
上記のように customers テーブルに変更を加えると、
- customers テーブルに変更が入る
- debezium により kafka にキューが登録される
- kafka に入ったキューを nodejs の consumer が拾って
console.log(message)
が実行されます。
実際に cunsumer のコンテナで message が確認できます。
message
から変更内容が取得できるので、その内容をもとに Lambda を叩くなり、リードモデルにデータを挿入したり、メール送ったりと、後はどうにでもなりそうです。