はじめに
マイクロサービスに関心を抱いている。
実現方法など調べているうちに、
アプリと別サービス間を疎結合(Transactional Outboxパターンなど)にする手法の1つとして「CDC」というものがあることが分かった。
その中の方法に「Debezium」があり、知識としては何ができるか大体わかってるつもりだが実際に触ったことなかったので、この機会に触れてみようと思う。
実施内容
-
事前準備(環境構築)など非常に面倒に感じる性格なため、
まずは公式(?)が用意している「debezium-examples」を導入して手軽にはじめる。 -
軽く動かしてみて挙動などが何となく分かったら、
DBサーバを別途構築し[debezium-examples]のDBサーバの変更イベントより、別DBサーバへデータ同期をしてみる。 -
余力があれば(運用観点で現実的にありそうな)
「OpenSearch」を別途構築し、こちらにもデータ同期を行い、検索&表示ができることの確認をしたい。
Debeziumについて
Kafkaを軸にしてCDCを実現してくれているようだ。
詳細はググるとたくさん良記事が出てくるので、そちらを参照ください。(TBD:余力あれば追記する)
「debezium-examples」導入
「debezium-examples」のgitクローンをする。
# git clone https://github.com/debezium/debezium-examples.git
# cd debezium-examples/tutorial
コンテナ起動
今回はチュートリアルに記載されている「Using MySQL」にてデモ環境を構築してみる。
DEBEZIUM_VERSIONには、
Dockerイメージ「quay.io/debezium/(zookeeper|kafka|example-mysql)」の最新バージョンを指定する。
# export DEBEZIUM_VERSION=3.1.1.Final
コンテナ起動前にdocker-compose定義を少々変更
- ネットワーク設定をlinksではなくnetworksに変更- 私の場合はMacOS(M3)だったため、docker-compose.yamlに「platform」を指定する必要があった。※Java起因(一例)などでコンテナが異常終了してしまった
diff --git a/tutorial/docker-compose-mysql.yaml b/tutorial/docker-compose-mysql.yaml
index 4be3508..4228ea2 100644
--- a/tutorial/docker-compose-mysql.yaml
+++ b/tutorial/docker-compose-mysql.yaml
@@ -2,28 +2,38 @@ version: '2'
services:
zookeeper:
image: quay.io/debezium/zookeeper:${DEBEZIUM_VERSION}
+ platform: linux/x86_64
ports:
- 2181:2181
- 2888:2888
- 3888:3888
+ networks:
+ - kafka-demo-net
kafka:
image: quay.io/debezium/kafka:${DEBEZIUM_VERSION}
+ platform: linux/x86_64
ports:
- 9092:9092
links:
- zookeeper
environment:
- ZOOKEEPER_CONNECT=zookeeper:2181
+ networks:
+ - kafka-demo-net
mysql:
image: quay.io/debezium/example-mysql:${DEBEZIUM_VERSION}
+ platform: linux/x86_64
ports:
- 3306:3306
environment:
- MYSQL_ROOT_PASSWORD=debezium
- MYSQL_USER=mysqluser
- MYSQL_PASSWORD=mysqlpw
+ networks:
+ - kafka-demo-net
connect:
image: quay.io/debezium/connect:${DEBEZIUM_VERSION}
+ platform: linux/x86_64
ports:
- 8083:8083
links:
@@ -35,3 +45,8 @@ services:
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses
+ networks:
+ - kafka-demo-net
+networks:
+ kafka-demo-net:
+ driver: bridge
# docker-compose -f docker-compose-mysql.yaml up
docker-compose起動後のコンテナ一覧は下記の通り。
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
5321ba94db8a quay.io/debezium/connect:3.1.1.Final "/docker-entrypoint.…" 36 seconds ago Up 35 seconds 8778/tcp, 0.0.0.0:8083->8083/tcp, 9092/tcp tutorial-connect-1
6b874f3f92c1 quay.io/debezium/kafka:3.1.1.Final "/docker-entrypoint.…" 20 minutes ago Up 35 seconds 0.0.0.0:9092->9092/tcp tutorial-kafka-1
9309f284de47 quay.io/debezium/example-mysql:3.1.1.Final "/entrypoint.sh mysq…" 20 minutes ago Up 35 seconds 0.0.0.0:3306->3306/tcp, 33060-33061/tcp tutorial-mysql-1
8f5ee8b0ffbd quay.io/debezium/zookeeper:3.1.1.Final "/docker-entrypoint.…" 20 minutes ago Up 35 seconds 0.0.0.0:2181->2181/tcp, 0.0.0.0:2888->2888/tcp, 0.0.0.0:3888->3888/tcp tutorial-zookeeper-1
Mysqlサーバ確認
Mysqlサーバへログイン
# docker-compose -f docker-compose-mysql.yaml exec mysql bash -c 'mysql -u $MYSQL_USER -p$MYSQL_PASSWORD inventory'
データベース一覧確認
mysql> SHOW DATABASES;
+--------------------+
| Database |
+--------------------+
| information_schema |
| inventory | ←変更チャプチャ対象のデータベース(後述)
| performance_schema |
+--------------------+
3 rows in set (0.00 sec)
テーブル一覧
mysql> USE inventory;
Database changed
mysql>
mysql>
mysql> SHOW TABLES;
+---------------------+
| Tables_in_inventory |
+---------------------+
| addresses |
| customers |
| geom |
| orders |
| products |
| products_on_hand |
+---------------------+
6 rows in set (0.01 sec)
レコード数の確認。初期レコードは登録されている状態のようだ。
テーブル名 | 件数 |
---|---|
addresses | 7 |
customers | 4 |
geom | 3 |
orders | 4 |
products | 9 |
products_on_hand | 9 |
mysql> select * from addresses;
+----+-------------+---------------------------+------------+--------------+-------+----------+
| id | customer_id | street | city | state | zip | type |
+----+-------------+---------------------------+------------+--------------+-------+----------+
| 10 | 1001 | 3183 Moore Avenue | Euless | Texas | 76036 | SHIPPING |
| 11 | 1001 | 2389 Hidden Valley Road | Harrisburg | Pennsylvania | 17116 | BILLING |
| 12 | 1002 | 281 Riverside Drive | Augusta | Georgia | 30901 | BILLING |
| 13 | 1003 | 3787 Brownton Road | Columbus | Mississippi | 39701 | SHIPPING |
| 14 | 1003 | 2458 Lost Creek Road | Bethlehem | Pennsylvania | 18018 | SHIPPING |
| 15 | 1003 | 4800 Simpson Square | Hillsdale | Oklahoma | 73743 | BILLING |
| 16 | 1004 | 1289 University Hill Road | Canehill | Arkansas | 72717 | LIVING |
+----+-------------+---------------------------+------------+--------------+-------+----------+
7 rows in set (0.01 sec)
mysql>
mysql> select * from customers;
+------+------------+-----------+-----------------------+
| id | first_name | last_name | email |
+------+------------+-----------+-----------------------+
| 1001 | Sally | Thomas | sally.thomas@acme.com |
| 1002 | George | Bailey | gbailey@foobar.com |
| 1003 | Edward | Walker | ed@walker.com |
| 1004 | Anne | Kretchmar | annek@noanswer.org |
+------+------------+-----------+-----------------------+
4 rows in set (0.00 sec)
mysql> select * from geom;
+----+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------+
| id | g | h |
+----+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------+
| 1 | 0x000000000101000000000000000000F03F000000000000F03F | NULL |
| 2 | 0x000000000102000000020000000000000000000040000000000000F03F00000000000018400000000000001840 | NULL |
| 3 | 0x0000000001030000000100000005000000000000000000000000000000000014400000000000000040000000000000144000000000000000400000000000001C4000000000000000000000000000001C4000000000000000000000000000001440 | NULL |
+----+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------+
3 rows in set (0.01 sec)
mysql> select * from orders;
+--------------+------------+-----------+----------+------------+
| order_number | order_date | purchaser | quantity | product_id |
+--------------+------------+-----------+----------+------------+
| 10001 | 2016-01-16 | 1001 | 1 | 102 |
| 10002 | 2016-01-17 | 1002 | 2 | 105 |
| 10003 | 2016-02-19 | 1002 | 2 | 106 |
| 10004 | 2016-02-21 | 1003 | 1 | 107 |
+--------------+------------+-----------+----------+------------+
4 rows in set (0.00 sec)
mysql> select * from products;
+-----+--------------------+---------------------------------------------------------+--------+
| id | name | description | weight |
+-----+--------------------+---------------------------------------------------------+--------+
| 101 | scooter | Small 2-wheel scooter | 3.14 |
| 102 | car battery | 12V car battery | 8.1 |
| 103 | 12-pack drill bits | 12-pack of drill bits with sizes ranging from #40 to #3 | 0.8 |
| 104 | hammer | 12oz carpenter's hammer | 0.75 |
| 105 | hammer | 14oz carpenter's hammer | 0.875 |
| 106 | hammer | 16oz carpenter's hammer | 1 |
| 107 | rocks | box of assorted rocks | 5.3 |
| 108 | jacket | water resistent black wind breaker | 0.1 |
| 109 | spare tire | 24 inch spare tire | 22.2 |
+-----+--------------------+---------------------------------------------------------+--------+
9 rows in set (0.00 sec)
mysql> select * from products_on_hand;
+------------+----------+
| product_id | quantity |
+------------+----------+
| 101 | 3 |
| 102 | 8 |
| 103 | 18 |
| 104 | 4 |
| 105 | 5 |
| 106 | 0 |
| 107 | 44 |
| 108 | 2 |
| 109 | 5 |
+------------+----------+
9 rows in set (0.01 sec)
動作確認
Kafkaコネクタ登録
チュートリアルにある「Start MySQL connector」を実行してみよう。ただその前に...
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-mysql.json
上記は一体何を(目的は?)しているのだろうか。
REST APIのエンドポイントから、kafka-connectに対して何か命令しているようだ。
新しいコネクタを作成し、成功した場合は現在のコネクタ情報を返します。 リバランス処理中の場合、またはコネクタが既に存在する場合は、 を返します。409 (Conflict)
コネクタの登録内容は、チュートリアル用の「register-mysql.json」に定義されている。
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "184054",
"topic.prefix": "dbserver1",
"database.include.list": "inventory",
"schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
"schema.history.internal.kafka.topic": "schema-changes.inventory"
}
}
主に、チュートリアル用のmysqlサーバの設定情報が定義されている。
コネクタには「io.debezium.connector.mysql.MySqlConnector(https://debezium.io/documentation/reference/stable/connectors/mysql.html)」が指定されており、プロパティなどコネクタごとに異なっているものと推測する。(公式ドキュメントを見れば何をセットすべきか書いている)
チュートリアルの定義を要約すると、
Mysqlサーバ(ホスト:mysql、ポート:3306、rootユーザー/rootパスワード:debezium/dbz)のデータベース「inventory」のみ変更をキャプチャする、になりそうだ。
コネクタプラグインについて
ざっくり、DBとKafkaトピックと紐付けするための命令になっていそうだ。
起動中の Connector プラグインを調べる。
# curl http://localhost:8083/connector-plugins | jq
[
{
"class": "io.debezium.connector.jdbc.JdbcSinkConnector",
"type": "sink",
"version": "3.1.1.Final"
},
{
"class": "io.debezium.connector.mongodb.MongoDbSinkConnector",
"type": "sink",
"version": "3.1.1.Final"
},
{
"class": "io.debezium.connector.db2.Db2Connector",
"type": "source",
"version": "3.1.1.Final"
},
{
"class": "io.debezium.connector.db2as400.As400RpcConnector",
"type": "source",
"version": "3.1.1.Final"
},
{
"class": "io.debezium.connector.informix.InformixConnector",
"type": "source",
"version": "3.1.1.Final"
},
{
"class": "io.debezium.connector.mariadb.MariaDbConnector",
"type": "source",
"version": "3.1.1.Final"
},
{
"class": "io.debezium.connector.mongodb.MongoDbConnector",
"type": "source",
"version": "3.1.1.Final"
},
{
"class": "io.debezium.connector.mysql.MySqlConnector",
"type": "source",
"version": "3.1.1.Final"
},
{
"class": "io.debezium.connector.oracle.OracleConnector",
"type": "source",
"version": "3.1.1.Final"
},
{
"class": "io.debezium.connector.postgresql.PostgresConnector",
"type": "source",
"version": "3.1.1.Final"
},
{
"class": "io.debezium.connector.spanner.SpannerConnector",
"type": "source",
"version": "3.1.1.Final"
},
{
"class": "io.debezium.connector.sqlserver.SqlServerConnector",
"type": "source",
"version": "3.1.1.Final"
},
{
"class": "io.debezium.connector.vitess.VitessConnector",
"type": "source",
"version": "3.1.1.Final"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
"type": "source",
"version": "3.9.0"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
"type": "source",
"version": "3.9.0"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
"type": "source",
"version": "3.9.0"
}
]
アクティブなコネクタのリストを確認する。
# curl http://localhost:8083/connectors
[]
まだ何も登録してないので空で応答される。
では、戻って「Start MySQL connector」を実行してみよう。
# curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-mysql.json
HTTP/1.1 201 Created
Date: Sun, 04 May 2025 06:47:27 GMT
Location: http://localhost:8083/connectors/inventory-connector
Content-Type: application/json
Content-Length: 501
Server: Jetty(9.4.56.v20240826)
{"name":"inventory-connector","config":{"connector.class":"io.debezium.connector.mysql.MySqlConnector","tasks.max":"1","database.hostname":"mysql","database.port":"3306","database.user":"debezium","database.password":"dbz","database.server.id":"184054","topic.prefix":"dbserver1","database.include.list":"inventory","schema.history.internal.kafka.bootstrap.servers":"kafka:9092","schema.history.internal.kafka.topic":"schema-changes.inventory","name":"inventory-connector"},"tasks":[],"type":"source"}%
登録は成功したようだ。アクティブなコネクタに登録したコネクタ名が表示された。
# curl http://localhost:8083/connectors
["inventory-connector"]
Debeziumトピックからメッセージを消費
チュートリアルの下記コマンドにて変更キャプチャを確認できそうだ。
一度実行してみよう。
# docker-compose -f docker-compose-mysql.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \
--bootstrap-server kafka:9092 \
--from-beginning \
--property print.key=true \
--topic dbserver1.inventory.customers
{
"schema": {
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
}
],
"optional": false,
"name": "dbserver1.inventory.customers.Key"
},
"payload": {
"id": 1001
}
..(省略)
}
出力が長すぎたので省略した。。
以下属性が配列で返される構造のようだ。
[
"payload",
"schema"
]
[
"payload",
"schema"
]
「--from-beginning」は、offsetの最初から読み込むためのオプションのため外すとリアルタイムな変更のみにできそうだ。
「--topic」には現在何を指定できるのだろう。何がトピックに用意されているものか確認してみる。
トピック一覧を見るには「/bin/kafka-topics.sh」を使うのが良さそうだ。
上記で実行した「/bin/kafka-console-consumer.sh」は、kafkaコンテナ内にあるbinを指定して実行していた。おそらく同じ場所に「/bin/kafka-topics.sh」があると推測。
以下実行することでトピックを確認することができた。
# docker-compose -f docker-compose-mysql.yaml exec kafka /kafka/bin/kafka-topics.sh \
--bootstrap-server kafka:9092 \
--list
dbserver1
dbserver1.inventory.addresses
dbserver1.inventory.customers
dbserver1.inventory.geom
dbserver1.inventory.orders
dbserver1.inventory.products
dbserver1.inventory.products_on_hand
my_connect_configs
my_connect_offsets
my_connect_statuses
schema-changes.inventory
では、実際に「customers」テーブルのレコードを新規追加(insert)した際の変更キャプチャの様子を見てみよう。
変更キャプチャを監視しつつ、
# docker-compose -f docker-compose-mysql.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \
--bootstrap-server kafka:9092 \
--property print.key=true \
--topic dbserver1.inventory.customers
別タブでMysqlサーバへ接続。
# docker-compose -f docker-compose-mysql.yaml exec mysql bash -c 'mysql -u $MYSQL_USER -p$MYSQL_PASSWORD inventory'
mysql> USE inventory;
Database changed
mysql> INSERT INTO inventory.customers(first_name, last_name, email) VALUES('Hoge', 'Fuga1', 'fuga.hoge@demo.org');
Query OK, 1 row affected (0.03 sec)
変更キャプチャに出力あり!
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"dbserver1.inventory.customers.Key"},"payload":{"id":1005}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,first,first_in_data_collection,last_in_data_collection,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","version":1,"field":"source"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"}],"optional":false,"name":"dbserver1.inventory.customers.Envelope","version":2},"payload":{"before":null,"after":{"id":1005,"first_name":"Hoge","last_name":"Fuga1","email":"fuga.hoge@demo.org"},"source":{"version":"3.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1746344616000,"snapshot":"false","db":"inventory","sequence":null,"ts_us":1746344616000000,"ts_ns":1746344616000000000,"table":"customers","server_id":1,"gtid":null,"file":"binlog.000003","pos":392,"row":0,"thread":18,"query":null},"transaction":null,"op":"c","ts_ms":1746344616957,"ts_us":1746344616957490,"ts_ns":1746344616957490921}}
加工しないと辛い。配列2要素目に有益な情報(before/after)がある。
"payload": {
"before": null,
"after": {
"id": 1005,
"first_name": "Hoge",
"last_name": "Fuga1",
"email": "fuga.hoge@demo.org"
},
お次は更新(update)を見てみる。emailを更新。
mysql> UPDATE inventory.customers SET email = 'fuga.hoge.update@demo.org' WHERE id = 1005;
Query OK, 1 row affected (0.02 sec)
Rows matched: 1 Changed: 1 Warnings: 0
変更キャプチャ。
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"dbserver1.inventory.customers.Key"},"payload":{"id":1005}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,first,first_in_data_collection,last_in_data_collection,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","version":1,"field":"source"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"}],"optional":false,"name":"dbserver1.inventory.customers.Envelope","version":2},"payload":{"before":{"id":1005,"first_name":"Hoge","last_name":"Fuga1","email":"fuga.hoge@demo.org"},"after":{"id":1005,"first_name":"Hoge","last_name":"Fuga1","email":"fuga.hoge.update@demo.org"},"source":{"version":"3.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1746345642000,"snapshot":"false","db":"inventory","sequence":null,"ts_us":1746345642000000,"ts_ns":1746345642000000000,"table":"customers","server_id":1,"gtid":null,"file":"binlog.000003","pos":740,"row":0,"thread":18,"query":null},"transaction":null,"op":"u","ts_ms":1746345642566,"ts_us":1746345642566122,"ts_ns":1746345642566122215}}
別DBサーバへのデータ同期
docker-compose-mysql.yamlに同期用Mysqlサーバを追加する。
mysql-sink:
image: mysql:8.0
platform: linux/x86_64
ports:
- "3308:3306"
environment:
MYSQL_ROOT_PASSWORD: root
MYSQL_DATABASE: database_b
volumes:
- ./mysql-sink-init.sql:/docker-entrypoint-initdb.d/init.sql
networks:
- kafka-demo-net
# cat mysql-sink-init.sql
CREATE DATABASE IF NOT EXISTS database_b;
USE database_b;
CREATE TABLE users (
id INT PRIMARY KEY AUTO_INCREMENT,
first_name VARCHAR(100),
last_name VARCHAR(100),
email VARCHAR(100)
);
# cat register-mysql-sink.json
{
"name": "mysql-sink-connector",
"config": {
"connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"connection.url": "jdbc:mysql://mysql-sink:3306/database_b",
"connection.username": "root",
"connection.password": "root",
"topics": "dbserver1.inventory.customers",
"table.name.format": "users",
"auto.create": "true",
"auto.evolve": "true",
"auto.evolve.schemas": "true",
"insert.mode": "upsert",
"delete.enabled": "true",
"primary.key.fields": "id",
"primary.key.mode": "record_key",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": true,
"value.converter.schemas.enable": true
}
}
dockerを再起動
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
042d24b0f468 mysql:8.0 "docker-entrypoint.s…" 3 seconds ago Up 3 seconds 33060/tcp, 0.0.0.0:3308->3306/tcp tutorial-mysql-sink-1
↑コンテナが追加される
5321ba94db8a quay.io/debezium/connect:3.1.1.Final "/docker-entrypoint.…" 2 hours ago Up 2 hours 8778/tcp, 0.0.0.0:8083->8083/tcp, 9092/tcp tutorial-connect-1
6b874f3f92c1 quay.io/debezium/kafka:3.1.1.Final "/docker-entrypoint.…" 2 hours ago Up 2 hours 0.0.0.0:9092->9092/tcp tutorial-kafka-1
9309f284de47 quay.io/debezium/example-mysql:3.1.1.Final "/entrypoint.sh mysq…" 2 hours ago Up 2 hours 0.0.0.0:3306->3306/tcp, 33060-33061/tcp tutorial-mysql-1
8f5ee8b0ffbd quay.io/debezium/zookeeper:3.1.1.Final "/docker-entrypoint.…" 2 hours ago Up 2 hours 0.0.0.0:2181->2181/tcp, 0.0.0.0:2888->2888/tcp, 0.0.0.0:3888->3888/tcp tutorial-zookeeper-1
同期用Mysqlサーバ確認
# docker-compose -f docker-compose-mysql.yaml exec mysql-sink bash -c 'mysql -u root -proot database_b'
mysql> SHOW DATABASES;
+--------------------+
| Database |
+--------------------+
| database_b |
| information_schema |
| mysql |
| performance_schema |
| sys |
+--------------------+
5 rows in set (0.00 sec)
mysql> USE database_b
Database changed
mysql> SHOW TABLES;
+----------------------+
| Tables_in_database_b |
+----------------------+
| users |
+----------------------+
1 row in set (0.00 sec)
mysql> select * from users;
Empty set (0.01 sec)
同期用コネクタを追加する。
# curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-mysql-sink.json
HTTP/1.1 201 Created
Date: Sun, 04 May 2025 08:29:57 GMT
Location: http://localhost:8083/connectors/mysql-sink-connector
Content-Type: application/json
Content-Length: 744
Server: Jetty(9.4.56.v20240826)
{"name":"mysql-sink-connector","config":{"connector.class":"io.debezium.connector.jdbc.JdbcSinkConnector","tasks.max":"1","connection.url":"jdbc:mysql://mysql-sink:3306/database_b","connection.username":"root","connection.password":"root","topics":"dbserver1.inventory.customers","table.name.format":"users","auto.create":"true","auto.evolve":"true","auto.evolve.schemas":"true","insert.mode":"upsert","delete.enabled":"true","primary.key.fields":"id","primary.key.mode":"record_key","key.converter":"org.apache.kafka.connect.json.JsonConverter","value.converter":"org.apache.kafka.connect.json.JsonConverter","key.converter.schemas.enable":"true","value.converter.schemas.enable":"true","name":"mysql-sink-connector"},"tasks":[],"type":"sink"}
# curl -i -H "Accept:application/json" http://localhost:8083/connectors/mysql-sink-connector/status
{"name":"mysql-sink-connector","connector":{"state":"RUNNING","worker_id":"172.20.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.20.0.5:8083"}],"type":"sink"}
# curl http://localhost:8083/connectors
["mysql-sink-connector","inventory-connector"]
では、新規データを登録して同期されるか見てみよう。
# docker-compose -f docker-compose-mysql.yaml exec mysql bash -c 'mysql -u $MYSQL_USER -p$MYSQL_PASSWORD inventory'
mysql> USE inventory;
mysql> INSERT INTO inventory.customers(first_name, last_name, email) VALUES('Hoge', 'Fuga2', 'fuga2.hoge@demo.org');
Query OK, 1 row affected (0.01 sec)
同期先Mysqlを確認。
# docker-compose -f docker-compose-mysql.yaml exec mysql-sink bash -c 'mysql -u root -proot database_b'
mysql> select * from users;
+------+------------+-----------+---------------------------+
| id | first_name | last_name | email |
+------+------------+-----------+---------------------------+
| 1001 | Sally | Thomas | sally.thomas@acme.com |
| 1002 | George | Bailey | gbailey@foobar.com |
| 1003 | Edward | Walker | ed@walker.com |
| 1004 | Anne | Kretchmar | annek@noanswer.org |
| 1005 | Hoge | Fuga1 | fuga.hoge.update@demo.org |
| 1006 | Hoge | Fuga2 | fuga2.hoge@demo.org |
+------+------------+-----------+---------------------------+
6 rows in set (0.00 sec)
全レコードが登録されていた。
コネクタ追加した時点で追加されていたのだろう...
新規レコードも存在するため同期は成功していそうだ。
更新も試してみよう。先ほど同様にemailを更新する。
# docker-compose -f docker-compose-mysql.yaml exec mysql bash -c 'mysql -u $MYSQL_USER -p$MYSQL_PASSWORD inventory'
mysql> USE inventory;
mysql> UPDATE inventory.customers SET email = 'fuga2.hoge.update@demo.org' WHERE id = 1006;
Query OK, 1 row affected (0.01 sec)
Rows matched: 1 Changed: 1 Warnings: 0
同期先Mysqlを確認。同期されていた。
mysql> select * from users;
+------+------------+-----------+----------------------------+
| id | first_name | last_name | email |
+------+------------+-----------+----------------------------+
| 1001 | Sally | Thomas | sally.thomas@acme.com |
| 1002 | George | Bailey | gbailey@foobar.com |
| 1003 | Edward | Walker | ed@walker.com |
| 1004 | Anne | Kretchmar | annek@noanswer.org |
| 1005 | Hoge | Fuga1 | fuga.hoge.update@demo.org |
| 1006 | Hoge | Fuga2 | fuga2.hoge.update@demo.org |
+------+------------+-----------+----------------------------+
6 rows in set (0.01 sec)
「OpenSearch」へのデータ同期
kafkaコネクタにOpeaSearch同期用プラグインをインストールする必要がある。
https://www.confluent.io/hub/confluentinc/kafka-connect-elasticsearch
tutorial配下に「connect-plugins」ディレクトリ作成し、ダウンロードしたものを移動する。
# ls -l tutorial/connect-plugins
total 0
drwxr-xr-x@ 8 tetsu staff 256 5 4 18:15 confluentinc-kafka-connect-elasticsearch-15.0.0
そしてconnectコンテナのvolumesにてplugin置き場に追加すると使用することができる。
connect:
image: quay.io/debezium/connect:${DEBEZIUM_VERSION}
(..省略)
volumes:
- ./connect-plugins/confluentinc-kafka-connect-elasticsearch-15.0.0:/kafka/connect/confluentinc-kafka-connect-elasticsearch-15.0.0
プラグイン確認。
# curl -s localhost:8083/connector-plugins | jq '.[].class'
"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector"
↑これが追加される。
"io.debezium.connector.jdbc.JdbcSinkConnector"
"io.debezium.connector.mongodb.MongoDbSinkConnector"
"io.debezium.connector.db2.Db2Connector"
"io.debezium.connector.db2as400.As400RpcConnector"
"io.debezium.connector.informix.InformixConnector"
"io.debezium.connector.mariadb.MariaDbConnector"
"io.debezium.connector.mongodb.MongoDbConnector"
"io.debezium.connector.mysql.MySqlConnector"
"io.debezium.connector.oracle.OracleConnector"
"io.debezium.connector.postgresql.PostgresConnector"
"io.debezium.connector.spanner.SpannerConnector"
"io.debezium.connector.sqlserver.SqlServerConnector"
"io.debezium.connector.vitess.VitessConnector"
"org.apache.kafka.connect.mirror.MirrorCheckpointConnector"
"org.apache.kafka.connect.mirror.MirrorHeartbeatConnector"
"org.apache.kafka.connect.mirror.MirrorSourceConnector"
以下コンテナ定義を追加。
→kibanaはデータ検索目的のコンテナなので、kafka連携は不要と判断し「kafka-demo-net」ネットワークには属さない設定にした。
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch-wolfi:8.18.0
#platform: linux/x86_64
ports:
- 9200:9200
environment:
- discovery.type=single-node
- xpack.security.enabled=false
- cluster.name=docker-cluster
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
ulimits:
memlock:
soft: -1
hard: -1
networks:
- kafka-demo-net
kibana:
image: docker.elastic.co/kibana/kibana:8.18.0
#platform: linux/x86_64
environment:
ELASTICSEARCH_HOSTS: http://elasticsearch:9200
ports:
- 5601:5601
#networks:
# - kafka-demo-net
コンテナ再起動。
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
ef09ab1655f2 docker.elastic.co/elasticsearch/elasticsearch-wolfi:8.18.0 "/sbin/tini -- /usr/…" About a minute ago Up About a minute 0.0.0.0:9200->9200/tcp, 9300/tcp tutorial-elasticsearch-1
ecee9b8e9d3b docker.elastic.co/kibana/kibana:8.18.0 "/bin/tini -- /usr/l…" 3 minutes ago Up 3 minutes 0.0.0.0:5601->5601/tcp tutorial-kibana-1
↑2コンテナ追加される
042d24b0f468 mysql:8.0 "docker-entrypoint.s…" 43 minutes ago Up 43 minutes 33060/tcp, 0.0.0.0:3308->3306/tcp tutorial-mysql-sink-1
5321ba94db8a quay.io/debezium/connect:3.1.1.Final "/docker-entrypoint.…" 2 hours ago Up 2 hours 8778/tcp, 0.0.0.0:8083->8083/tcp, 9092/tcp tutorial-connect-1
6b874f3f92c1 quay.io/debezium/kafka:3.1.1.Final "/docker-entrypoint.…" 3 hours ago Up 2 hours 0.0.0.0:9092->9092/tcp tutorial-kafka-1
9309f284de47 quay.io/debezium/example-mysql:3.1.1.Final "/entrypoint.sh mysq…" 3 hours ago Up 2 hours 0.0.0.0:3306->3306/tcp, 33060-33061/tcp tutorial-mysql-1
8f5ee8b0ffbd quay.io/debezium/zookeeper:3.1.1.Final "/docker-entrypoint.…" 3 hours ago Up 2 hours 0.0.0.0:2181->2181/tcp, 0.0.0.0:2888->2888/tcp, 0.0.0.0:3888->3888/tcp tutorial-zookeeper-1
コネクタを追加する。(中身の細かな設定は適当..)
# cat register-opensearch-sink.json
{
"name": "elasticsearch-sink",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics": "dbserver1.inventory.customers",
"key.ignore": "true",
"schema.ignore": "true",
"connection.url": "http://elasticsearch:9200",
"type.name": "_doc",
"name": "elasticsearch-sink",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": true,
"value.converter.schemas.enable": true
}
}
# curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-opensearch-sink.json
{"name":"opensearch-sink-connector","config":{"connector.class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector","tasks.max":"1","topics":"dbserver1.inventory.customers","key.ignore":"true","schema.ignore":"true","connection.url":"http://elasticsearch:9200","type.name":"_doc","key.converter":"org.apache.kafka.connect.json.JsonConverter","value.converter":"org.apache.kafka.connect.json.JsonConverter","key.converter.schemas.enable":"true","value.converter.schemas.enable":"true","auto.create":"true","auto.evolve":"true","auto.evolve.schemas":"true","name":"opensearch-sink-connector"},"tasks":[],"type":"sink"}
# curl http://localhost:8083/connectors
["opensearch-sink-connector","mysql-sink-connector","inventory-connector"]
# curl -i -H "Accept:application/json" http://localhost:8083/connectors/opensearch-sink-connector/status
{"name":"opensearch-sink-connector","connector":{"state":"RUNNING","worker_id":"172.20.0.7:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.20.0.7:8083"}],"type":"sink"}
レコード追加。
docker-compose -f docker-compose-mysql.yaml exec mysql bash -c 'mysql -u $MYSQL_USER -p$MYSQL_PASSWORD inventory'
INSERT INTO inventory.customers(first_name, last_name, email) VALUES('Hoge', 'Fuga3', 'fuga3.hoge@demo.org');
OpenSearchのインデックス確認
# curl -XGET 'localhost:9200/_cat/indices?v'
health status index uuid pri rep docs.count docs.deleted store.size pri.store.size dataset.size
yellow open dbserver1.inventory.customers UOH6a_YPRienxo8TtWI6_A 1 1 8 0 20.9kb 20.9kb 20.9kb
「dbserver1.inventory.customers」インデックスが登録されている。
curl -XGET 'localhost:9200/dbserver1.inventory.customers/_search'
{"took":80,"timed_out":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":8,"relation":"eq"},"max_score":1.0,"hits":[{"_index":"dbserver1.inventory.customers","_id":"dbserver1.inventory.customers+0+0","_score":1.0,"_source":{"before":null,"after":{"id":1001,"first_name":"Sally","last_name":"Thomas","email":"sally.thomas@acme.com"},"source":{"version":"3.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1746341249000,"snapshot":"first_in_data_collection","db":"inventory","sequence":null,"ts_us":1746341249000000,"ts_ns":1746341249000000000,"table":"customers","server_id":0,"gtid":null,"file":"binlog.000003","pos":157,"row":0,"thread":null,"query":null},"transaction":null,"op":"r","ts_ms":1746341249993,"ts_us":1746341249993074,"ts_ns":1746341249993074126}},{"_index":"dbserver1.inventory.customers","_id":"dbserver1.inventory.customers+0+1","_score":1.0,"_source":{"before":null,"after":{"id":1002,"first_name":"George","last_name":"Bailey","email":"gbailey@foobar.com"},"source":{"version":"3.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1746341249000,"snapshot":"true","db":"inventory","sequence":null,"ts_us":1746341249000000,"ts_ns":1746341249000000000,"table":"customers","server_id":0,"gtid":null,"file":"binlog.000003","pos":157,"row":0,"thread":null,"query":null},"transaction":null,"op":"r","ts_ms":1746341249993,"ts_us":1746341249993500,"ts_ns":1746341249993500501}},{"_index":"dbserver1.inventory.customers","_id":"dbserver1.inventory.customers+0+2","_score":1.0,"_source":{"before":null,"after":{"id":1003,"first_name":"Edward","last_name":"Walker","email":"ed@walker.com"},"source":{"version":"3.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1746341249000,"snapshot":"true","db":"inventory","sequence":null,"ts_us":1746341249000000,"ts_ns":1746341249000000000,"table":"customers","server_id":0,"gtid":null,"file":"binlog.000003","pos":157,"row":0,"thread":null,"query":null},"transaction":null,"op":"r","ts_ms":1746341249994,"ts_us":1746341249994315,"ts_ns":1746341249994315793}},{"_index":"dbserver1.inventory.customers","_id":"dbserver1.inventory.customers+0+3","_score":1.0,"_source":{"before":null,"after":{"id":1004,"first_name":"Anne","last_name":"Kretchmar","email":"annek@noanswer.org"},"source":{"version":"3.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1746341249000,"snapshot":"last_in_data_collection","db":"inventory","sequence":null,"ts_us":1746341249000000,"ts_ns":1746341249000000000,"table":"customers","server_id":0,"gtid":null,"file":"binlog.000003","pos":157,"row":0,"thread":null,"query":null},"transaction":null,"op":"r","ts_ms":1746341249994,"ts_us":1746341249994462,"ts_ns":1746341249994462918}},{"_index":"dbserver1.inventory.customers","_id":"dbserver1.inventory.customers+0+4","_score":1.0,"_source":{"before":null,"after":{"id":1005,"first_name":"Hoge","last_name":"Fuga1","email":"fuga.hoge@demo.org"},"source":{"version":"3.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1746344616000,"snapshot":"false","db":"inventory","sequence":null,"ts_us":1746344616000000,"ts_ns":1746344616000000000,"table":"customers","server_id":1,"gtid":null,"file":"binlog.000003","pos":392,"row":0,"thread":18,"query":null},"transaction":null,"op":"c","ts_ms":1746344616957,"ts_us":1746344616957490,"ts_ns":1746344616957490921}},{"_index":"dbserver1.inventory.customers","_id":"dbserver1.inventory.customers+0+6","_score":1.0,"_source":{"before":null,"after":{"id":1006,"first_name":"Hoge","last_name":"Fuga2","email":"fuga2.hoge@demo.org"},"source":{"version":"3.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1746347664000,"snapshot":"false","db":"inventory","sequence":null,"ts_us":1746347664000000,"ts_ns":1746347664000000000,"table":"customers","server_id":1,"gtid":null,"file":"binlog.000003","pos":1125,"row":0,"thread":20,"query":null},"transaction":null,"op":"c","ts_ms":1746347664616,"ts_us":1746347664616974,"ts_ns":1746347664616974554}},{"_index":"dbserver1.inventory.customers","_id":"dbserver1.inventory.customers+0+5","_score":1.0,"_source":{"before":{"id":1005,"first_name":"Hoge","last_name":"Fuga1","email":"fuga.hoge@demo.org"},"after":{"id":1005,"first_name":"Hoge","last_name":"Fuga1","email":"fuga.hoge.update@demo.org"},"source":{"version":"3.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1746345642000,"snapshot":"false","db":"inventory","sequence":null,"ts_us":1746345642000000,"ts_ns":1746345642000000000,"table":"customers","server_id":1,"gtid":null,"file":"binlog.000003","pos":740,"row":0,"thread":18,"query":null},"transaction":null,"op":"u","ts_ms":1746345642566,"ts_us":1746345642566122,"ts_ns":1746345642566122215}},{"_index":"dbserver1.inventory.customers","_id":"dbserver1.inventory.customers+0+7","_score":1.0,"_source":{"before":{"id":1006,"first_name":"Hoge","last_name":"Fuga2","email":"fuga2.hoge@demo.org"},"after":{"id":1006,"first_name":"Hoge","last_name":"Fuga2","email":"fuga2.hoge.update@demo.org"},"source":{"version":"3.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1746347991000,"snapshot":"false","db":"inventory","sequence":null,"ts_us":1746347991000000,"ts_ns":1746347991000000000,"table":"customers","server_id":1,"gtid":null,"file":"binlog.000003","pos":1474,"row":0,"thread":20,"query":null},"transaction":null,"op":"u","ts_ms":1746347991207,"ts_us":1746347991207050,"ts_ns":1746347991207050969}}]}}
うーん、
customesのレコードが単に連携されることを期待していたが、変更キャプチャ自体が連携されている。
一旦、ここまで。