0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

WIP: CDC(Debezium)に触れてみた

Posted at

はじめに

マイクロサービスに関心を抱いている。

実現方法など調べているうちに、
アプリと別サービス間を疎結合(Transactional Outboxパターンなど)にする手法の1つとして「CDC」というものがあることが分かった。

その中の方法に「Debezium」があり、知識としては何ができるか大体わかってるつもりだが実際に触ったことなかったので、この機会に触れてみようと思う。

実施内容

  • 事前準備(環境構築)など非常に面倒に感じる性格なため、
    まずは公式(?)が用意している「debezium-examples」を導入して手軽にはじめる。

  • 軽く動かしてみて挙動などが何となく分かったら、
    DBサーバを別途構築し[debezium-examples]のDBサーバの変更イベントより、別DBサーバへデータ同期をしてみる。

  • 余力があれば(運用観点で現実的にありそうな)
    OpenSearch」を別途構築し、こちらにもデータ同期を行い、検索&表示ができることの確認をしたい。

Debeziumについて

Kafkaを軸にしてCDCを実現してくれているようだ。
詳細はググるとたくさん良記事が出てくるので、そちらを参照ください。(TBD:余力あれば追記する)

「debezium-examples」導入

「debezium-examples」のgitクローンをする。

# git clone https://github.com/debezium/debezium-examples.git
# cd debezium-examples/tutorial 

コンテナ起動

今回はチュートリアルに記載されている「Using MySQL」にてデモ環境を構築してみる。
スクリーンショット 2025-05-04 14.12.36.png

DEBEZIUM_VERSIONには、
Dockerイメージ「quay.io/debezium/(zookeeper|kafka|example-mysql)」の最新バージョンを指定する。

「3.1.1.Final」が最新のようだ。
スクリーンショット 2025-05-04 14.15.26.png

# export DEBEZIUM_VERSION=3.1.1.Final
コンテナ起動前にdocker-compose定義を少々変更 - ネットワーク設定をlinksではなくnetworksに変更
  • 私の場合はMacOS(M3)だったため、docker-compose.yamlに「platform」を指定する必要があった。※Java起因(一例)などでコンテナが異常終了してしまった
diff --git a/tutorial/docker-compose-mysql.yaml b/tutorial/docker-compose-mysql.yaml
index 4be3508..4228ea2 100644
--- a/tutorial/docker-compose-mysql.yaml
+++ b/tutorial/docker-compose-mysql.yaml
@@ -2,28 +2,38 @@ version: '2'
 services:
   zookeeper:
     image: quay.io/debezium/zookeeper:${DEBEZIUM_VERSION}
+    platform: linux/x86_64
     ports:
      - 2181:2181
      - 2888:2888
      - 3888:3888
+    networks:
+      - kafka-demo-net
   kafka:
     image: quay.io/debezium/kafka:${DEBEZIUM_VERSION}
+    platform: linux/x86_64
     ports:
      - 9092:9092
     links:
      - zookeeper
     environment:
      - ZOOKEEPER_CONNECT=zookeeper:2181
+    networks:
+      - kafka-demo-net
   mysql:
     image: quay.io/debezium/example-mysql:${DEBEZIUM_VERSION}
+    platform: linux/x86_64
     ports:
      - 3306:3306
     environment:
      - MYSQL_ROOT_PASSWORD=debezium
      - MYSQL_USER=mysqluser
      - MYSQL_PASSWORD=mysqlpw
+    networks:
+      - kafka-demo-net
   connect:
     image: quay.io/debezium/connect:${DEBEZIUM_VERSION}
+    platform: linux/x86_64
     ports:
      - 8083:8083
     links:
@@ -35,3 +45,8 @@ services:
      - CONFIG_STORAGE_TOPIC=my_connect_configs
      - OFFSET_STORAGE_TOPIC=my_connect_offsets
      - STATUS_STORAGE_TOPIC=my_connect_statuses
+    networks:
+      - kafka-demo-net
+networks:
+  kafka-demo-net:
+    driver: bridge

# docker-compose -f docker-compose-mysql.yaml up

docker-compose起動後のコンテナ一覧は下記の通り。

CONTAINER ID   IMAGE                                        COMMAND                   CREATED          STATUS         PORTS                                                                    NAMES
5321ba94db8a   quay.io/debezium/connect:3.1.1.Final         "/docker-entrypoint.…"   36 seconds ago   Up 35 seconds   8778/tcp, 0.0.0.0:8083->8083/tcp, 9092/tcp                               tutorial-connect-1
6b874f3f92c1   quay.io/debezium/kafka:3.1.1.Final           "/docker-entrypoint.…"   20 minutes ago   Up 35 seconds   0.0.0.0:9092->9092/tcp                                                   tutorial-kafka-1
9309f284de47   quay.io/debezium/example-mysql:3.1.1.Final   "/entrypoint.sh mysq…"   20 minutes ago   Up 35 seconds   0.0.0.0:3306->3306/tcp, 33060-33061/tcp                                  tutorial-mysql-1
8f5ee8b0ffbd   quay.io/debezium/zookeeper:3.1.1.Final       "/docker-entrypoint.…"   20 minutes ago   Up 35 seconds   0.0.0.0:2181->2181/tcp, 0.0.0.0:2888->2888/tcp, 0.0.0.0:3888->3888/tcp   tutorial-zookeeper-1
Mysqlサーバ確認

Mysqlサーバへログイン

 # docker-compose -f docker-compose-mysql.yaml exec mysql bash -c 'mysql -u $MYSQL_USER -p$MYSQL_PASSWORD inventory'

データベース一覧確認

mysql> SHOW DATABASES;
+--------------------+
| Database           |
+--------------------+
| information_schema |
| inventory          | ←変更チャプチャ対象のデータベース(後述)
| performance_schema |
+--------------------+
3 rows in set (0.00 sec)

テーブル一覧

mysql> USE inventory;
Database changed
mysql> 
mysql> 
mysql> SHOW TABLES;
+---------------------+
| Tables_in_inventory |
+---------------------+
| addresses           |
| customers           |
| geom                |
| orders              |
| products            |
| products_on_hand    |
+---------------------+
6 rows in set (0.01 sec)

レコード数の確認。初期レコードは登録されている状態のようだ。

テーブル名 件数
addresses 7
customers 4
geom 3
orders 4
products 9
products_on_hand 9
mysql> select * from addresses;
+----+-------------+---------------------------+------------+--------------+-------+----------+
| id | customer_id | street                    | city       | state        | zip   | type     |
+----+-------------+---------------------------+------------+--------------+-------+----------+
| 10 |        1001 | 3183 Moore Avenue         | Euless     | Texas        | 76036 | SHIPPING |
| 11 |        1001 | 2389 Hidden Valley Road   | Harrisburg | Pennsylvania | 17116 | BILLING  |
| 12 |        1002 | 281 Riverside Drive       | Augusta    | Georgia      | 30901 | BILLING  |
| 13 |        1003 | 3787 Brownton Road        | Columbus   | Mississippi  | 39701 | SHIPPING |
| 14 |        1003 | 2458 Lost Creek Road      | Bethlehem  | Pennsylvania | 18018 | SHIPPING |
| 15 |        1003 | 4800 Simpson Square       | Hillsdale  | Oklahoma     | 73743 | BILLING  |
| 16 |        1004 | 1289 University Hill Road | Canehill   | Arkansas     | 72717 | LIVING   |
+----+-------------+---------------------------+------------+--------------+-------+----------+
7 rows in set (0.01 sec)

mysql> 
mysql> select * from customers;
+------+------------+-----------+-----------------------+
| id   | first_name | last_name | email                 |
+------+------------+-----------+-----------------------+
| 1001 | Sally      | Thomas    | sally.thomas@acme.com |
| 1002 | George     | Bailey    | gbailey@foobar.com    |
| 1003 | Edward     | Walker    | ed@walker.com         |
| 1004 | Anne       | Kretchmar | annek@noanswer.org    |
+------+------------+-----------+-----------------------+
4 rows in set (0.00 sec)

mysql> select * from geom;
+----+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------+
| id | g                                                                                                                                                                                                    | h          |
+----+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------+
|  1 | 0x000000000101000000000000000000F03F000000000000F03F                                                                                                                                                 | NULL       |
|  2 | 0x000000000102000000020000000000000000000040000000000000F03F00000000000018400000000000001840                                                                                                         | NULL       |
|  3 | 0x0000000001030000000100000005000000000000000000000000000000000014400000000000000040000000000000144000000000000000400000000000001C4000000000000000000000000000001C4000000000000000000000000000001440 | NULL       |
+----+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------+
3 rows in set (0.01 sec)

mysql> select * from orders;
+--------------+------------+-----------+----------+------------+
| order_number | order_date | purchaser | quantity | product_id |
+--------------+------------+-----------+----------+------------+
|        10001 | 2016-01-16 |      1001 |        1 |        102 |
|        10002 | 2016-01-17 |      1002 |        2 |        105 |
|        10003 | 2016-02-19 |      1002 |        2 |        106 |
|        10004 | 2016-02-21 |      1003 |        1 |        107 |
+--------------+------------+-----------+----------+------------+
4 rows in set (0.00 sec)

mysql> select * from products;
+-----+--------------------+---------------------------------------------------------+--------+
| id  | name               | description                                             | weight |
+-----+--------------------+---------------------------------------------------------+--------+
| 101 | scooter            | Small 2-wheel scooter                                   |   3.14 |
| 102 | car battery        | 12V car battery                                         |    8.1 |
| 103 | 12-pack drill bits | 12-pack of drill bits with sizes ranging from #40 to #3 |    0.8 |
| 104 | hammer             | 12oz carpenter's hammer                                 |   0.75 |
| 105 | hammer             | 14oz carpenter's hammer                                 |  0.875 |
| 106 | hammer             | 16oz carpenter's hammer                                 |      1 |
| 107 | rocks              | box of assorted rocks                                   |    5.3 |
| 108 | jacket             | water resistent black wind breaker                      |    0.1 |
| 109 | spare tire         | 24 inch spare tire                                      |   22.2 |
+-----+--------------------+---------------------------------------------------------+--------+
9 rows in set (0.00 sec)

mysql> select * from products_on_hand;
+------------+----------+
| product_id | quantity |
+------------+----------+
|        101 |        3 |
|        102 |        8 |
|        103 |       18 |
|        104 |        4 |
|        105 |        5 |
|        106 |        0 |
|        107 |       44 |
|        108 |        2 |
|        109 |        5 |
+------------+----------+
9 rows in set (0.01 sec)


動作確認

Kafkaコネクタ登録

チュートリアルにある「Start MySQL connector」を実行してみよう。ただその前に...

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-mysql.json

上記は一体何を(目的は?)しているのだろうか。

REST APIのエンドポイントから、kafka-connectに対して何か命令しているようだ。

新しいコネクタを作成し、成功した場合は現在のコネクタ情報を返します。 リバランス処理中の場合、またはコネクタが既に存在する場合は、 を返します。409 (Conflict)

コネクタの登録内容は、チュートリアル用の「register-mysql.json」に定義されている。

{
    "name": "inventory-connector",
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "tasks.max": "1",
        "database.hostname": "mysql",
        "database.port": "3306",
        "database.user": "debezium",
        "database.password": "dbz",
        "database.server.id": "184054",
        "topic.prefix": "dbserver1",
        "database.include.list": "inventory",
        "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
        "schema.history.internal.kafka.topic": "schema-changes.inventory"
    }
}

主に、チュートリアル用のmysqlサーバの設定情報が定義されている。
コネクタには「io.debezium.connector.mysql.MySqlConnector(https://debezium.io/documentation/reference/stable/connectors/mysql.html)」が指定されており、プロパティなどコネクタごとに異なっているものと推測する。(公式ドキュメントを見れば何をセットすべきか書いている)

スクリーンショット 2025-05-04 15.11.12.png

チュートリアルの定義を要約すると、
Mysqlサーバ(ホスト:mysql、ポート:3306、rootユーザー/rootパスワード:debezium/dbz)のデータベース「inventory」のみ変更をキャプチャする、になりそうだ。

コネクタプラグインについて

kafka-connectは下記機能を提供しているとある。
スクリーンショット 2025-05-04 14.55.40.png

ざっくり、DBとKafkaトピックと紐付けするための命令になっていそうだ。

起動中の Connector プラグインを調べる。

# curl http://localhost:8083/connector-plugins | jq
[
  {
    "class": "io.debezium.connector.jdbc.JdbcSinkConnector",
    "type": "sink",
    "version": "3.1.1.Final"
  },
  {
    "class": "io.debezium.connector.mongodb.MongoDbSinkConnector",
    "type": "sink",
    "version": "3.1.1.Final"
  },
  {
    "class": "io.debezium.connector.db2.Db2Connector",
    "type": "source",
    "version": "3.1.1.Final"
  },
  {
    "class": "io.debezium.connector.db2as400.As400RpcConnector",
    "type": "source",
    "version": "3.1.1.Final"
  },
  {
    "class": "io.debezium.connector.informix.InformixConnector",
    "type": "source",
    "version": "3.1.1.Final"
  },
  {
    "class": "io.debezium.connector.mariadb.MariaDbConnector",
    "type": "source",
    "version": "3.1.1.Final"
  },
  {
    "class": "io.debezium.connector.mongodb.MongoDbConnector",
    "type": "source",
    "version": "3.1.1.Final"
  },
  {
    "class": "io.debezium.connector.mysql.MySqlConnector",
    "type": "source",
    "version": "3.1.1.Final"
  },
  {
    "class": "io.debezium.connector.oracle.OracleConnector",
    "type": "source",
    "version": "3.1.1.Final"
  },
  {
    "class": "io.debezium.connector.postgresql.PostgresConnector",
    "type": "source",
    "version": "3.1.1.Final"
  },
  {
    "class": "io.debezium.connector.spanner.SpannerConnector",
    "type": "source",
    "version": "3.1.1.Final"
  },
  {
    "class": "io.debezium.connector.sqlserver.SqlServerConnector",
    "type": "source",
    "version": "3.1.1.Final"
  },
  {
    "class": "io.debezium.connector.vitess.VitessConnector",
    "type": "source",
    "version": "3.1.1.Final"
  },
  {
    "class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
    "type": "source",
    "version": "3.9.0"
  },
  {
    "class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
    "type": "source",
    "version": "3.9.0"
  },
  {
    "class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
    "type": "source",
    "version": "3.9.0"
  }
]

アクティブなコネクタのリストを確認する。

# curl http://localhost:8083/connectors
[]

まだ何も登録してないので空で応答される。

では、戻って「Start MySQL connector」を実行してみよう。

# curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-mysql.json

HTTP/1.1 201 Created
Date: Sun, 04 May 2025 06:47:27 GMT
Location: http://localhost:8083/connectors/inventory-connector
Content-Type: application/json
Content-Length: 501
Server: Jetty(9.4.56.v20240826)

{"name":"inventory-connector","config":{"connector.class":"io.debezium.connector.mysql.MySqlConnector","tasks.max":"1","database.hostname":"mysql","database.port":"3306","database.user":"debezium","database.password":"dbz","database.server.id":"184054","topic.prefix":"dbserver1","database.include.list":"inventory","schema.history.internal.kafka.bootstrap.servers":"kafka:9092","schema.history.internal.kafka.topic":"schema-changes.inventory","name":"inventory-connector"},"tasks":[],"type":"source"}%       

登録は成功したようだ。アクティブなコネクタに登録したコネクタ名が表示された。

# curl http://localhost:8083/connectors                                                                                                  
["inventory-connector"]

Debeziumトピックからメッセージを消費

チュートリアルの下記コマンドにて変更キャプチャを確認できそうだ。
一度実行してみよう。

# docker-compose -f docker-compose-mysql.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \
    --bootstrap-server kafka:9092 \
    --from-beginning \
    --property print.key=true \
    --topic dbserver1.inventory.customers


{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "int32",
        "optional": false,
        "field": "id"
      }
    ],
    "optional": false,
    "name": "dbserver1.inventory.customers.Key"
  },
  "payload": {
    "id": 1001
  }

  ..(省略)
}

出力が長すぎたので省略した。。
以下属性が配列で返される構造のようだ。

[
  "payload",
  "schema"
]
[
  "payload",
  "schema"
]

「--from-beginning」は、offsetの最初から読み込むためのオプションのため外すとリアルタイムな変更のみにできそうだ。

「--topic」には現在何を指定できるのだろう。何がトピックに用意されているものか確認してみる。
トピック一覧を見るには「/bin/kafka-topics.sh」を使うのが良さそうだ。

上記で実行した「/bin/kafka-console-consumer.sh」は、kafkaコンテナ内にあるbinを指定して実行していた。おそらく同じ場所に「/bin/kafka-topics.sh」があると推測。

以下実行することでトピックを確認することができた。

# docker-compose -f docker-compose-mysql.yaml exec kafka /kafka/bin/kafka-topics.sh \
  --bootstrap-server kafka:9092 \
  --list

dbserver1
dbserver1.inventory.addresses
dbserver1.inventory.customers
dbserver1.inventory.geom
dbserver1.inventory.orders
dbserver1.inventory.products
dbserver1.inventory.products_on_hand
my_connect_configs
my_connect_offsets
my_connect_statuses
schema-changes.inventory

では、実際に「customers」テーブルのレコードを新規追加(insert)した際の変更キャプチャの様子を見てみよう。

変更キャプチャを監視しつつ、

# docker-compose -f docker-compose-mysql.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \
    --bootstrap-server kafka:9092 \
    --property print.key=true \
    --topic dbserver1.inventory.customers

別タブでMysqlサーバへ接続。

# docker-compose -f docker-compose-mysql.yaml exec mysql bash -c 'mysql -u $MYSQL_USER -p$MYSQL_PASSWORD inventory'

mysql> USE inventory;
Database changed

mysql> INSERT INTO inventory.customers(first_name, last_name, email) VALUES('Hoge', 'Fuga1', 'fuga.hoge@demo.org');
Query OK, 1 row affected (0.03 sec)

変更キャプチャに出力あり!

{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"dbserver1.inventory.customers.Key"},"payload":{"id":1005}}	{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,first,first_in_data_collection,last_in_data_collection,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","version":1,"field":"source"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"}],"optional":false,"name":"dbserver1.inventory.customers.Envelope","version":2},"payload":{"before":null,"after":{"id":1005,"first_name":"Hoge","last_name":"Fuga1","email":"fuga.hoge@demo.org"},"source":{"version":"3.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1746344616000,"snapshot":"false","db":"inventory","sequence":null,"ts_us":1746344616000000,"ts_ns":1746344616000000000,"table":"customers","server_id":1,"gtid":null,"file":"binlog.000003","pos":392,"row":0,"thread":18,"query":null},"transaction":null,"op":"c","ts_ms":1746344616957,"ts_us":1746344616957490,"ts_ns":1746344616957490921}}

加工しないと辛い。配列2要素目に有益な情報(before/after)がある。
スクリーンショット 2025-05-04 16.56.15.png

  "payload": {
    "before": null,
    "after": {
      "id": 1005,
      "first_name": "Hoge",
      "last_name": "Fuga1",
      "email": "fuga.hoge@demo.org"
    },

お次は更新(update)を見てみる。emailを更新。

mysql> UPDATE inventory.customers SET email = 'fuga.hoge.update@demo.org' WHERE id = 1005;

Query OK, 1 row affected (0.02 sec)
Rows matched: 1  Changed: 1  Warnings: 0

変更キャプチャ。

{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"dbserver1.inventory.customers.Key"},"payload":{"id":1005}}	{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,first,first_in_data_collection,last_in_data_collection,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","version":1,"field":"source"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"int64","optional":true,"field":"ts_us"},{"type":"int64","optional":true,"field":"ts_ns"}],"optional":false,"name":"dbserver1.inventory.customers.Envelope","version":2},"payload":{"before":{"id":1005,"first_name":"Hoge","last_name":"Fuga1","email":"fuga.hoge@demo.org"},"after":{"id":1005,"first_name":"Hoge","last_name":"Fuga1","email":"fuga.hoge.update@demo.org"},"source":{"version":"3.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1746345642000,"snapshot":"false","db":"inventory","sequence":null,"ts_us":1746345642000000,"ts_ns":1746345642000000000,"table":"customers","server_id":1,"gtid":null,"file":"binlog.000003","pos":740,"row":0,"thread":18,"query":null},"transaction":null,"op":"u","ts_ms":1746345642566,"ts_us":1746345642566122,"ts_ns":1746345642566122215}}

更新の場合は、beforeに変更前の情報が入っている。
スクリーンショット 2025-05-04 17.01.49.png

別DBサーバへのデータ同期

docker-compose-mysql.yamlに同期用Mysqlサーバを追加する。

  mysql-sink:
    image: mysql:8.0
    platform: linux/x86_64
    ports:
      - "3308:3306"
    environment:
      MYSQL_ROOT_PASSWORD: root
      MYSQL_DATABASE: database_b
    volumes:
      - ./mysql-sink-init.sql:/docker-entrypoint-initdb.d/init.sql
    networks:
      - kafka-demo-net

# cat mysql-sink-init.sql
CREATE DATABASE IF NOT EXISTS database_b;
USE database_b;
CREATE TABLE users (
  id INT PRIMARY KEY AUTO_INCREMENT,
  first_name VARCHAR(100),
  last_name VARCHAR(100),
  email VARCHAR(100)
);
# cat register-mysql-sink.json 
{
  "name": "mysql-sink-connector",
  "config": {
    "connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",
    "tasks.max": "1",
    "connection.url": "jdbc:mysql://mysql-sink:3306/database_b",
    "connection.username": "root",
    "connection.password": "root",
    "topics": "dbserver1.inventory.customers",
    "table.name.format": "users",
    "auto.create": "true",
    "auto.evolve": "true",
    "auto.evolve.schemas": "true",
    "insert.mode": "upsert",
    "delete.enabled": "true",
    "primary.key.fields": "id",
    "primary.key.mode": "record_key",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": true,
    "value.converter.schemas.enable": true
  }
}

dockerを再起動

CONTAINER ID   IMAGE                                        COMMAND                   CREATED         STATUS         PORTS                                                                    NAMES
042d24b0f468   mysql:8.0                                    "docker-entrypoint.s…"   3 seconds ago   Up 3 seconds   33060/tcp, 0.0.0.0:3308->3306/tcp                                        tutorial-mysql-sink-1
↑コンテナが追加される

5321ba94db8a   quay.io/debezium/connect:3.1.1.Final         "/docker-entrypoint.…"   2 hours ago     Up 2 hours     8778/tcp, 0.0.0.0:8083->8083/tcp, 9092/tcp                               tutorial-connect-1
6b874f3f92c1   quay.io/debezium/kafka:3.1.1.Final           "/docker-entrypoint.…"   2 hours ago     Up 2 hours     0.0.0.0:9092->9092/tcp                                                   tutorial-kafka-1
9309f284de47   quay.io/debezium/example-mysql:3.1.1.Final   "/entrypoint.sh mysq…"   2 hours ago     Up 2 hours     0.0.0.0:3306->3306/tcp, 33060-33061/tcp                                  tutorial-mysql-1
8f5ee8b0ffbd   quay.io/debezium/zookeeper:3.1.1.Final       "/docker-entrypoint.…"   2 hours ago     Up 2 hours     0.0.0.0:2181->2181/tcp, 0.0.0.0:2888->2888/tcp, 0.0.0.0:3888->3888/tcp   tutorial-zookeeper-1

同期用Mysqlサーバ確認

# docker-compose -f docker-compose-mysql.yaml exec mysql-sink bash -c 'mysql -u root -proot database_b'

mysql> SHOW DATABASES;
+--------------------+
| Database           |
+--------------------+
| database_b         |
| information_schema |
| mysql              |
| performance_schema |
| sys                |
+--------------------+
5 rows in set (0.00 sec)


mysql> USE database_b
Database changed

mysql> SHOW TABLES;
+----------------------+
| Tables_in_database_b |
+----------------------+
| users                |
+----------------------+
1 row in set (0.00 sec)

mysql> select * from users;
Empty set (0.01 sec)

同期用コネクタを追加する。

# curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-mysql-sink.json

HTTP/1.1 201 Created
Date: Sun, 04 May 2025 08:29:57 GMT
Location: http://localhost:8083/connectors/mysql-sink-connector
Content-Type: application/json
Content-Length: 744
Server: Jetty(9.4.56.v20240826)

{"name":"mysql-sink-connector","config":{"connector.class":"io.debezium.connector.jdbc.JdbcSinkConnector","tasks.max":"1","connection.url":"jdbc:mysql://mysql-sink:3306/database_b","connection.username":"root","connection.password":"root","topics":"dbserver1.inventory.customers","table.name.format":"users","auto.create":"true","auto.evolve":"true","auto.evolve.schemas":"true","insert.mode":"upsert","delete.enabled":"true","primary.key.fields":"id","primary.key.mode":"record_key","key.converter":"org.apache.kafka.connect.json.JsonConverter","value.converter":"org.apache.kafka.connect.json.JsonConverter","key.converter.schemas.enable":"true","value.converter.schemas.enable":"true","name":"mysql-sink-connector"},"tasks":[],"type":"sink"}

# curl -i -H "Accept:application/json" http://localhost:8083/connectors/mysql-sink-connector/status 

{"name":"mysql-sink-connector","connector":{"state":"RUNNING","worker_id":"172.20.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.20.0.5:8083"}],"type":"sink"}

# curl http://localhost:8083/connectors                                          
["mysql-sink-connector","inventory-connector"]

では、新規データを登録して同期されるか見てみよう。

# docker-compose -f docker-compose-mysql.yaml exec mysql bash -c 'mysql -u $MYSQL_USER -p$MYSQL_PASSWORD inventory'

mysql> USE inventory;

mysql> INSERT INTO inventory.customers(first_name, last_name, email) VALUES('Hoge', 'Fuga2', 'fuga2.hoge@demo.org');

Query OK, 1 row affected (0.01 sec)

同期先Mysqlを確認。

# docker-compose -f docker-compose-mysql.yaml exec mysql-sink bash -c 'mysql -u root -proot database_b'

mysql> select * from users;
+------+------------+-----------+---------------------------+
| id   | first_name | last_name | email                     |
+------+------------+-----------+---------------------------+
| 1001 | Sally      | Thomas    | sally.thomas@acme.com     |
| 1002 | George     | Bailey    | gbailey@foobar.com        |
| 1003 | Edward     | Walker    | ed@walker.com             |
| 1004 | Anne       | Kretchmar | annek@noanswer.org        |
| 1005 | Hoge       | Fuga1     | fuga.hoge.update@demo.org |
| 1006 | Hoge       | Fuga2     | fuga2.hoge@demo.org       |
+------+------------+-----------+---------------------------+
6 rows in set (0.00 sec)

全レコードが登録されていた。
コネクタ追加した時点で追加されていたのだろう...
新規レコードも存在するため同期は成功していそうだ。

更新も試してみよう。先ほど同様にemailを更新する。

# docker-compose -f docker-compose-mysql.yaml exec mysql bash -c 'mysql -u $MYSQL_USER -p$MYSQL_PASSWORD inventory'

mysql> USE inventory;

mysql> UPDATE inventory.customers SET email = 'fuga2.hoge.update@demo.org' WHERE id = 1006;

Query OK, 1 row affected (0.01 sec)
Rows matched: 1  Changed: 1  Warnings: 0

同期先Mysqlを確認。同期されていた。

mysql> select * from users;
+------+------------+-----------+----------------------------+
| id   | first_name | last_name | email                      |
+------+------------+-----------+----------------------------+
| 1001 | Sally      | Thomas    | sally.thomas@acme.com      |
| 1002 | George     | Bailey    | gbailey@foobar.com         |
| 1003 | Edward     | Walker    | ed@walker.com              |
| 1004 | Anne       | Kretchmar | annek@noanswer.org         |
| 1005 | Hoge       | Fuga1     | fuga.hoge.update@demo.org  |
| 1006 | Hoge       | Fuga2     | fuga2.hoge.update@demo.org |
+------+------------+-----------+----------------------------+
6 rows in set (0.01 sec)

「OpenSearch」へのデータ同期

kafkaコネクタにOpeaSearch同期用プラグインをインストールする必要がある。
https://www.confluent.io/hub/confluentinc/kafka-connect-elasticsearch

「Self-Hosted」のDownloadで取得。
スクリーンショット 2025-05-04 18.18.04.png

tutorial配下に「connect-plugins」ディレクトリ作成し、ダウンロードしたものを移動する。

# ls -l tutorial/connect-plugins 
total 0
drwxr-xr-x@ 8 tetsu  staff  256  5  4 18:15 confluentinc-kafka-connect-elasticsearch-15.0.0

そしてconnectコンテナのvolumesにてplugin置き場に追加すると使用することができる。

  connect:
    image: quay.io/debezium/connect:${DEBEZIUM_VERSION}
    (..省略)
    volumes:
      - ./connect-plugins/confluentinc-kafka-connect-elasticsearch-15.0.0:/kafka/connect/confluentinc-kafka-connect-elasticsearch-15.0.0

プラグイン確認。

# curl -s localhost:8083/connector-plugins | jq '.[].class' 
"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector"
↑これが追加される。

"io.debezium.connector.jdbc.JdbcSinkConnector"
"io.debezium.connector.mongodb.MongoDbSinkConnector"
"io.debezium.connector.db2.Db2Connector"
"io.debezium.connector.db2as400.As400RpcConnector"
"io.debezium.connector.informix.InformixConnector"
"io.debezium.connector.mariadb.MariaDbConnector"
"io.debezium.connector.mongodb.MongoDbConnector"
"io.debezium.connector.mysql.MySqlConnector"
"io.debezium.connector.oracle.OracleConnector"
"io.debezium.connector.postgresql.PostgresConnector"
"io.debezium.connector.spanner.SpannerConnector"
"io.debezium.connector.sqlserver.SqlServerConnector"
"io.debezium.connector.vitess.VitessConnector"
"org.apache.kafka.connect.mirror.MirrorCheckpointConnector"
"org.apache.kafka.connect.mirror.MirrorHeartbeatConnector"
"org.apache.kafka.connect.mirror.MirrorSourceConnector"

以下コンテナ定義を追加。
→kibanaはデータ検索目的のコンテナなので、kafka連携は不要と判断し「kafka-demo-net」ネットワークには属さない設定にした。

  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch-wolfi:8.18.0
    #platform: linux/x86_64
    ports:
      - 9200:9200
    environment:
      - discovery.type=single-node
      - xpack.security.enabled=false
      - cluster.name=docker-cluster
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
    ulimits:
      memlock:
        soft: -1
        hard: -1
    networks:
      - kafka-demo-net
  kibana:
    image: docker.elastic.co/kibana/kibana:8.18.0
    #platform: linux/x86_64
    environment:
      ELASTICSEARCH_HOSTS: http://elasticsearch:9200
    ports:
       - 5601:5601
    #networks:
    #  - kafka-demo-net

コンテナ再起動。

CONTAINER ID   IMAGE                                                        COMMAND                   CREATED              STATUS              PORTS                                                                    NAMES
ef09ab1655f2   docker.elastic.co/elasticsearch/elasticsearch-wolfi:8.18.0   "/sbin/tini -- /usr/…"   About a minute ago   Up About a minute   0.0.0.0:9200->9200/tcp, 9300/tcp                                         tutorial-elasticsearch-1
ecee9b8e9d3b   docker.elastic.co/kibana/kibana:8.18.0                       "/bin/tini -- /usr/l…"   3 minutes ago        Up 3 minutes        0.0.0.0:5601->5601/tcp                                                   tutorial-kibana-1
↑2コンテナ追加される

042d24b0f468   mysql:8.0                                                    "docker-entrypoint.s…"   43 minutes ago       Up 43 minutes       33060/tcp, 0.0.0.0:3308->3306/tcp                                        tutorial-mysql-sink-1
5321ba94db8a   quay.io/debezium/connect:3.1.1.Final                         "/docker-entrypoint.…"   2 hours ago          Up 2 hours          8778/tcp, 0.0.0.0:8083->8083/tcp, 9092/tcp                               tutorial-connect-1
6b874f3f92c1   quay.io/debezium/kafka:3.1.1.Final                           "/docker-entrypoint.…"   3 hours ago          Up 2 hours          0.0.0.0:9092->9092/tcp                                                   tutorial-kafka-1
9309f284de47   quay.io/debezium/example-mysql:3.1.1.Final                   "/entrypoint.sh mysq…"   3 hours ago          Up 2 hours          0.0.0.0:3306->3306/tcp, 33060-33061/tcp                                  tutorial-mysql-1
8f5ee8b0ffbd   quay.io/debezium/zookeeper:3.1.1.Final                       "/docker-entrypoint.…"   3 hours ago          Up 2 hours          0.0.0.0:2181->2181/tcp, 0.0.0.0:2888->2888/tcp, 0.0.0.0:3888->3888/tcp   tutorial-zookeeper-1

コネクタを追加する。(中身の細かな設定は適当..)

# cat register-opensearch-sink.json
{
  "name": "elasticsearch-sink",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "tasks.max": "1",
    "topics": "dbserver1.inventory.customers",
    "key.ignore": "true",
    "schema.ignore": "true",
    "connection.url": "http://elasticsearch:9200",
    "type.name": "_doc",
    "name": "elasticsearch-sink",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": true,
    "value.converter.schemas.enable": true
  }
}

# curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-opensearch-sink.json

{"name":"opensearch-sink-connector","config":{"connector.class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector","tasks.max":"1","topics":"dbserver1.inventory.customers","key.ignore":"true","schema.ignore":"true","connection.url":"http://elasticsearch:9200","type.name":"_doc","key.converter":"org.apache.kafka.connect.json.JsonConverter","value.converter":"org.apache.kafka.connect.json.JsonConverter","key.converter.schemas.enable":"true","value.converter.schemas.enable":"true","auto.create":"true","auto.evolve":"true","auto.evolve.schemas":"true","name":"opensearch-sink-connector"},"tasks":[],"type":"sink"}

# curl http://localhost:8083/connectors
["opensearch-sink-connector","mysql-sink-connector","inventory-connector"]

# curl -i -H "Accept:application/json" http://localhost:8083/connectors/opensearch-sink-connector/status

{"name":"opensearch-sink-connector","connector":{"state":"RUNNING","worker_id":"172.20.0.7:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.20.0.7:8083"}],"type":"sink"}

レコード追加。

docker-compose -f docker-compose-mysql.yaml exec mysql bash -c 'mysql -u $MYSQL_USER -p$MYSQL_PASSWORD inventory'


INSERT INTO inventory.customers(first_name, last_name, email) VALUES('Hoge', 'Fuga3', 'fuga3.hoge@demo.org');

OpenSearchのインデックス確認

# curl -XGET 'localhost:9200/_cat/indices?v'         
health status index                         uuid                   pri rep docs.count docs.deleted store.size pri.store.size dataset.size
yellow open   dbserver1.inventory.customers UOH6a_YPRienxo8TtWI6_A   1   1          8            0     20.9kb         20.9kb       20.9kb

「dbserver1.inventory.customers」インデックスが登録されている。

curl -XGET 'localhost:9200/dbserver1.inventory.customers/_search'

{"took":80,"timed_out":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":8,"relation":"eq"},"max_score":1.0,"hits":[{"_index":"dbserver1.inventory.customers","_id":"dbserver1.inventory.customers+0+0","_score":1.0,"_source":{"before":null,"after":{"id":1001,"first_name":"Sally","last_name":"Thomas","email":"sally.thomas@acme.com"},"source":{"version":"3.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1746341249000,"snapshot":"first_in_data_collection","db":"inventory","sequence":null,"ts_us":1746341249000000,"ts_ns":1746341249000000000,"table":"customers","server_id":0,"gtid":null,"file":"binlog.000003","pos":157,"row":0,"thread":null,"query":null},"transaction":null,"op":"r","ts_ms":1746341249993,"ts_us":1746341249993074,"ts_ns":1746341249993074126}},{"_index":"dbserver1.inventory.customers","_id":"dbserver1.inventory.customers+0+1","_score":1.0,"_source":{"before":null,"after":{"id":1002,"first_name":"George","last_name":"Bailey","email":"gbailey@foobar.com"},"source":{"version":"3.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1746341249000,"snapshot":"true","db":"inventory","sequence":null,"ts_us":1746341249000000,"ts_ns":1746341249000000000,"table":"customers","server_id":0,"gtid":null,"file":"binlog.000003","pos":157,"row":0,"thread":null,"query":null},"transaction":null,"op":"r","ts_ms":1746341249993,"ts_us":1746341249993500,"ts_ns":1746341249993500501}},{"_index":"dbserver1.inventory.customers","_id":"dbserver1.inventory.customers+0+2","_score":1.0,"_source":{"before":null,"after":{"id":1003,"first_name":"Edward","last_name":"Walker","email":"ed@walker.com"},"source":{"version":"3.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1746341249000,"snapshot":"true","db":"inventory","sequence":null,"ts_us":1746341249000000,"ts_ns":1746341249000000000,"table":"customers","server_id":0,"gtid":null,"file":"binlog.000003","pos":157,"row":0,"thread":null,"query":null},"transaction":null,"op":"r","ts_ms":1746341249994,"ts_us":1746341249994315,"ts_ns":1746341249994315793}},{"_index":"dbserver1.inventory.customers","_id":"dbserver1.inventory.customers+0+3","_score":1.0,"_source":{"before":null,"after":{"id":1004,"first_name":"Anne","last_name":"Kretchmar","email":"annek@noanswer.org"},"source":{"version":"3.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1746341249000,"snapshot":"last_in_data_collection","db":"inventory","sequence":null,"ts_us":1746341249000000,"ts_ns":1746341249000000000,"table":"customers","server_id":0,"gtid":null,"file":"binlog.000003","pos":157,"row":0,"thread":null,"query":null},"transaction":null,"op":"r","ts_ms":1746341249994,"ts_us":1746341249994462,"ts_ns":1746341249994462918}},{"_index":"dbserver1.inventory.customers","_id":"dbserver1.inventory.customers+0+4","_score":1.0,"_source":{"before":null,"after":{"id":1005,"first_name":"Hoge","last_name":"Fuga1","email":"fuga.hoge@demo.org"},"source":{"version":"3.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1746344616000,"snapshot":"false","db":"inventory","sequence":null,"ts_us":1746344616000000,"ts_ns":1746344616000000000,"table":"customers","server_id":1,"gtid":null,"file":"binlog.000003","pos":392,"row":0,"thread":18,"query":null},"transaction":null,"op":"c","ts_ms":1746344616957,"ts_us":1746344616957490,"ts_ns":1746344616957490921}},{"_index":"dbserver1.inventory.customers","_id":"dbserver1.inventory.customers+0+6","_score":1.0,"_source":{"before":null,"after":{"id":1006,"first_name":"Hoge","last_name":"Fuga2","email":"fuga2.hoge@demo.org"},"source":{"version":"3.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1746347664000,"snapshot":"false","db":"inventory","sequence":null,"ts_us":1746347664000000,"ts_ns":1746347664000000000,"table":"customers","server_id":1,"gtid":null,"file":"binlog.000003","pos":1125,"row":0,"thread":20,"query":null},"transaction":null,"op":"c","ts_ms":1746347664616,"ts_us":1746347664616974,"ts_ns":1746347664616974554}},{"_index":"dbserver1.inventory.customers","_id":"dbserver1.inventory.customers+0+5","_score":1.0,"_source":{"before":{"id":1005,"first_name":"Hoge","last_name":"Fuga1","email":"fuga.hoge@demo.org"},"after":{"id":1005,"first_name":"Hoge","last_name":"Fuga1","email":"fuga.hoge.update@demo.org"},"source":{"version":"3.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1746345642000,"snapshot":"false","db":"inventory","sequence":null,"ts_us":1746345642000000,"ts_ns":1746345642000000000,"table":"customers","server_id":1,"gtid":null,"file":"binlog.000003","pos":740,"row":0,"thread":18,"query":null},"transaction":null,"op":"u","ts_ms":1746345642566,"ts_us":1746345642566122,"ts_ns":1746345642566122215}},{"_index":"dbserver1.inventory.customers","_id":"dbserver1.inventory.customers+0+7","_score":1.0,"_source":{"before":{"id":1006,"first_name":"Hoge","last_name":"Fuga2","email":"fuga2.hoge@demo.org"},"after":{"id":1006,"first_name":"Hoge","last_name":"Fuga2","email":"fuga2.hoge.update@demo.org"},"source":{"version":"3.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1746347991000,"snapshot":"false","db":"inventory","sequence":null,"ts_us":1746347991000000,"ts_ns":1746347991000000000,"table":"customers","server_id":1,"gtid":null,"file":"binlog.000003","pos":1474,"row":0,"thread":20,"query":null},"transaction":null,"op":"u","ts_ms":1746347991207,"ts_us":1746347991207050,"ts_ns":1746347991207050969}}]}}

うーん、
customesのレコードが単に連携されることを期待していたが、変更キャプチャ自体が連携されている。

一旦、ここまで。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?