1
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

CDC(Debezium)を使ってRDB(MySQL)の変更イベントをKafkaに挿入しよう

Last updated at Posted at 2022-05-24

概要

以下をやるよ!

image.png

元ネタ

上記のRed Hatさんのチュートリアルを参考に記載しています
以下記載した手順で「なんかうまくいかないぞ」みたいなことがあれば
参照してください

環境構築概要

Dockerだから環境構築らくらく

①Docker Desktop on Windows のインストール
②Zookeeperを起動
③Kafkaを起動
④MySQLデータベースを起動
⑤MySQLコマンドラインクライアントを起動
⑥Debezium(KafkaConnect)を起動
⑦イベント監視(watch-topic)を起動

Docker Desktop on Windows のインストール

以下から取得してインストール
https://docs.docker.com/desktop/windows/install/

以下は無償。それ以外は有料なのでお気をつけを。
・個人利用
・スモールビジネス
・教育機関
・非商用のオープンソースプロジェクト

Zookeeperを起動

Windows PowerShellを管理者モードで起動

以下、コマンドを実行

docker run -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 quay.io/debezium/zookeeper:1.9

プロセス確認したければ、別のWindows PowerShellを起動して以下コマンドを実行

docker ps

Kafkaを起動

Windows PowerShellを管理者モードで起動

以下、コマンドを実行

docker run -it --rm --name kafka -p 9092:9092 --link zookeeper:zookeeper quay.io/debezium/kafka:1.9

MySQLデータベースを起動

Windows PowerShellを管理者モードで起動

以下、コマンドを実行

docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw quay.io/debezium/example-mysql:1.9

MySQLコマンドラインクライアントを起動

Windows PowerShellを管理者モードで起動

以下、コマンドを実行

docker run -it --rm --name mysqlterm --link mysql --rm mysql:8.0 sh -c 'exec mysql -h"$MYSQL_PORT_3306_TCP_ADDR" -P"$MYSQL_PORT_3306_TCP_PORT" -uroot -p"$MYSQL_ENV_MYSQL_ROOT_PASSWORD"'

DBの中身を確認

mysql> use inventory;
mysql> show tables;

Debezium(KafkaConnect)を起動

Windows PowerShellを管理者モードで起動

以下、コマンドを実行

docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link zookeeper:zookeeper --link kafka:kafka --link mysql:mysql quay.io/debezium/connect:1.9

データベースを監視するためにコネクタを登録

KafkaConnectサーバーのクライアント起動

image.png

以下、コマンド実行

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "database.server.id": "184054", "database.server.name": "dbserver1", "database.include.list": "inventory", "database.history.kafka.bootstrap.servers": "kafka:9092", "database.history.kafka.topic": "dbhistory.inventory" } }'

イベント監視(watch-topic)を起動

Windows PowerShellを管理者モードで起動

以下、コマンドを実行

docker run -it --rm --name watcher --link zookeeper:zookeeper --link kafka:kafka quay.io/debezium/kafka:1.9 watch-topic -a -k dbserver1.inventory.customers

検証①(UPDATE)

MySQLコマンドラインクライアントを実行しているターミナルで、以下を実行

SELECT * FROM customers;
UPDATE customers SET first_name='Anne Marie' WHERE id=1004;

イベント監視(watch-topic)を確認
※first_nameカラムのデータがAnne Marieに変更されていることが確認できる

 "before":{"id":1004,"first_name":"Anne","last_name":"Kretchmar","email":"annek@noanswer.org"}
, "after":{"id":1004,"first_name":"Anne Marie","last_name":"Kretchmar","email":"annek@noanswer.org"}

検証②(DDL)

MySQLコマンドラインクライアントを実行しているターミナルで、以下を実行

ALTER TABLE customers ADD (add_col_test text);

イベント監視(watch-topic)を確認 → 反応なし

以下を追加実行

UPDATE customers SET first_name='Anne Marie Hoge' WHERE id=1004;

イベント監視(watch-topic)を確認
※”add_col_test”カラムが追加されていること、first_nameカラムのデータが”Anne Marie Hoge”に変更されたことが確認できる

"after":{"id":1004,"first_name":"Anne Marie Hoge","last_name":"Kretchmar","email":"annek@noanswer.org","test":null,"add_col_test":null}
1
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?