概要
本記事では、Azure Database for MySQL をソースとして、Confluent で提供される Debezium を用いて CDC(Change Data Capture)を実行する手順と、その実行結果についてご紹介します。Azure Database for MySQL に対する SQL の実行を Azure Data Studio で実施しています。
本記事は、以下のシリーズの一部です。
引用元:Data in Motion 実現へ:Data Streaming Platform である Confluent の全貌 #Kafka - Qiita
環境準備
Azure Database for MySQL の環境構築
1. Azure Database for MySQL Flexible Server の作成
2. binlog_row_image
をfull
に設定
3. データベースを作成
Confluent Cloud での MySQL CDC Source V2 (Debezium) コネクタの作成
動作検証
1. Primary Key を設定しているテーブルでの動作検証
事前準備
まずは customers テーブルを作成します。
CREATE TABLE customers (
id INT PRIMARY KEY,
first_name VARCHAR(255) NOT NULL,
last_name VARCHAR(255) NOT NULL,
email VARCHAR(255) NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
INSERT 操作(create events)
以下のようにレコードを INSERT します。
INSERT INTO customers (id, first_name, last_name, email)
VALUES
(1, 'Anne', 'Kretchmar', 'annek@example.org');
Confluent のトピックを確認すると、INSERT イベントが以下のように記録されています。
{
"before": null,
"after": {
"id": 1,
"first_name": "Anne",
"last_name": "Kretchmar",
"email": "annek@example.org",
"created_at": "2025-01-05T09:06:23Z"
},
"source": {
"version": "2.4.2.Final",
"connector": "mysql",
"name": "debezium_mysql",
"ts_ms": 1736067983000,
"snapshot": "false",
"db": "debezium",
"sequence": null,
"table": "customers",
"server_id": 2287969543,
"gtid": null,
"file": "mysql-bin.000001",
"pos": 1493,
"row": 0,
"thread": 49,
"query": null
},
"op": "c",
"ts_ms": 1736067983350,
"transaction": null
}
UPDATE 操作(update events)
次にレコードを UPDATE します。
-- 既存レコードを更新
UPDATE customers
SET email = 'updated_anne@example.org'
WHERE id = 1;
Confluent のトピックを確認すると、UPDATE イベントが以下のように記録されています。
{
"before": {
"id": 1,
"first_name": "Anne",
"last_name": "Kretchmar",
"email": "annek@example.org",
"created_at": "2025-01-05T09:06:23Z"
},
"after": {
"id": 1,
"first_name": "Anne",
"last_name": "Kretchmar",
"email": "updated_anne@example.org",
"created_at": "2025-01-05T09:06:23Z"
},
"source": {
"version": "2.4.2.Final",
"connector": "mysql",
"name": "debezium_mysql",
"ts_ms": 1736068123000,
"snapshot": "false",
"db": "debezium",
"sequence": null,
"table": "customers",
"server_id": 2287969543,
"gtid": null,
"file": "mysql-bin.000001",
"pos": 1844,
"row": 0,
"thread": 49,
"query": null
},
"op": "u",
"ts_ms": 1736068123843,
"transaction": null
}
DELETE 操作(delete events)
次にレコードを DELETE します。
DELETE FROM customers
WHERE id = 1;
Confluent のトピックを確認すると、DELETE イベントが以下のように記録されています。
{
"before": {
"id": 1,
"first_name": "Anne",
"last_name": "Kretchmar",
"email": "updated_anne@example.org",
"created_at": "2025-01-05T09:06:23Z"
},
"after": null,
"source": {
"version": "2.4.2.Final",
"connector": "mysql",
"name": "debezium_mysql",
"ts_ms": 1736068158000,
"snapshot": "false",
"db": "debezium",
"sequence": null,
"table": "customers",
"server_id": 2287969543,
"gtid": null,
"file": "mysql-bin.000001",
"pos": 2239,
"row": 0,
"thread": 49,
"query": null
},
"op": "d",
"ts_ms": 1736068158834,
"transaction": null
}
""
Primary Key の値を更新(Primary key updates)
Primary key(id)の値を更新する操作を実施します。
-- 通常の INSERT
INSERT INTO customers (id, first_name, last_name, email)
VALUES
(1, 'Anne', 'Kretchmar', 'annek@example.org');
UPDATE customers
SET id = 10
WHERE id = 1;
Confluent のトピックでは、id=1 のレコードを削除→id=10 のレコードを挿入する形でイベントが記録されます。
{
"before": {
"id": 1,
"first_name": "Anne",
"last_name": "Kretchmar",
"email": "annek@example.org",
"created_at": "2025-01-05T09:11:25Z"
},
"after": null,
"source": {
"version": "2.4.2.Final",
"connector": "mysql",
"name": "debezium_mysql",
"ts_ms": 1736068290000,
"snapshot": "false",
"db": "debezium",
"sequence": null,
"table": "customers",
"server_id": 2287969543,
"gtid": null,
"file": "mysql-bin.000001",
"pos": 2939,
"row": 0,
"thread": 49,
"query": null
},
"op": "d",
"ts_ms": 1736068290066,
"transaction": null
}
""
{
"before": null,
"after": {
"id": 10,
"first_name": "Anne",
"last_name": "Kretchmar",
"email": "annek@example.org",
"created_at": "2025-01-05T09:11:25Z"
},
"source": {
"version": "2.4.2.Final",
"connector": "mysql",
"name": "debezium_mysql",
"ts_ms": 1736068290000,
"snapshot": "false",
"db": "debezium",
"sequence": null,
"table": "customers",
"server_id": 2287969543,
"gtid": null,
"file": "mysql-bin.000001",
"pos": 2939,
"row": 0,
"thread": 49,
"query": null
},
"op": "c",
"ts_ms": 1736068290066,
"transaction": null
}
TRUNCATE 操作(truncate events)
次にテーブルを TRUCATE します。
TRUNCATE TABLE customers;
Confluent のトピックを確認すると、INSERT イベント(op
フィールドの値がc
)が以下のように記録されており、想定外の動作となりました。
{
"before": null,
"after": {
"id": 2,
"first_name": "Bob",
"last_name": "Thomas",
"email": "bobt@example.org",
"created_at": "2025-01-05T09:52:58Z"
},
"source": {
"version": "2.4.2.Final",
"connector": "mysql",
"name": "debezium_mysql",
"ts_ms": 1736070778000,
"snapshot": "false",
"db": "debezium",
"sequence": null,
"table": "customers",
"server_id": 2287969543,
"gtid": null,
"file": "mysql-bin.000001",
"pos": 8736,
"row": 0,
"thread": 49,
"query": null
},
"op": "c",
"ts_ms": 1736070778015,
"transaction": null
}
2. 2 つのテーブルを結合した DML の動作検証
事前準備
例として、orders
テーブルとtmp_orders
テーブルを用意します。
CREATE TABLE orders (
order_id INT PRIMARY KEY,
customer_id INT NOT NULL,
amount INT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE tmp_orders (
order_id INT PRIMARY KEY,
customer_id INT NOT NULL,
amount INT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
INSERT INTO orders (order_id, customer_id, amount)
VALUES
(1, 1, 0);
INSERT INTO tmp_orders (order_id, customer_id, amount)
VALUES
(1, 1, 100),
(2, 2, 0);
DELETE 操作 (JOIN 例)
2 つのテーブルを JOIN してorders
テーブルを DELETE します。
DELETE orders
FROM orders
JOIN tmp_orders
ON orders.order_id = tmp_orders.order_id;
{
"before": {
"order_id": 1,
"customer_id": 1,
"amount": 0,
"created_at": "2025-01-05T09:18:26Z"
},
"after": null,
"source": {
"version": "2.4.2.Final",
"connector": "mysql",
"name": "debezium_mysql",
"ts_ms": 1736068788000,
"snapshot": "false",
"db": "debezium",
"sequence": null,
"table": "orders",
"server_id": 2287969543,
"gtid": null,
"file": "mysql-bin.000001",
"pos": 5134,
"row": 0,
"thread": 49,
"query": null
},
"op": "d",
"ts_ms": 1736068788429,
"transaction": null
}
""
INSERT SELECT 操作
tmp_orders
テーブルのデータをもとにorders
テーブルへ INSERT を実行します。
INSERT INTO orders
SELECT
order_id,
customer_id,
amount,
created_at
FROM tmp_orders;
Confluent のトピックでは、INSERT イベントが以下のように記録されています。
{
"before": null,
"after": {
"order_id": 1,
"customer_id": 1,
"amount": 100,
"created_at": "2025-01-05T09:18:26Z"
},
"source": {
"version": "2.4.2.Final",
"connector": "mysql",
"name": "debezium_mysql",
"ts_ms": 1736068970000,
"snapshot": "false",
"db": "debezium",
"sequence": null,
"table": "orders",
"server_id": 2287969543,
"gtid": null,
"file": "mysql-bin.000001",
"pos": 5443,
"row": 0,
"thread": 49,
"query": null
},
"op": "c",
"ts_ms": 1736068970315,
"transaction": null
}
MERGE 操作
MySQL では Merge 文を実行できないため省略。
Primary Key を設定していないテーブルでの動作検証
事前準備
Primary Key を持たないテーブルを作成します。
CREATE TABLE logs (
log_message TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
INSERT 操作(create events)
以下のようにレコードを INSERT します。
INSERT INTO logs (log_message)
VALUES
('System boot');
Confluent のトピックを確認すると、INSERT イベントが以下のように記録されています。
{
"before": null,
"after": {
"my_row_id": 1,
"log_message": "System boot",
"created_at": "2025-01-05T09:28:57Z"
},
"source": {
"version": "2.4.2.Final",
"connector": "mysql",
"name": "debezium_mysql",
"ts_ms": 1736069337000,
"snapshot": "false",
"db": "debezium",
"sequence": null,
"table": "logs",
"server_id": 2287969543,
"gtid": null,
"file": "mysql-bin.000001",
"pos": 7032,
"row": 0,
"thread": 49,
"query": null
},
"op": "c",
"ts_ms": 1736069337960,
"transaction": null
}
UPDATE 操作(update events)
Primary Key が無いテーブルでは、以下のようにレコードを特定して UPDATE します。
UPDATE logs
SET log_message = '[UPDATED] System boot'
WHERE log_message = 'System boot';
Confluent のトピックでは、UPDATE イベントが以下のように記録されています。
{
"before": {
"my_row_id": 1,
"log_message": "System boot",
"created_at": "2025-01-05T09:28:57Z"
},
"after": {
"my_row_id": 1,
"log_message": "[UPDATED] System boot",
"created_at": "2025-01-05T09:28:57Z"
},
"source": {
"version": "2.4.2.Final",
"connector": "mysql",
"name": "debezium_mysql",
"ts_ms": 1736069435000,
"snapshot": "false",
"db": "debezium",
"sequence": null,
"table": "logs",
"server_id": 2287969543,
"gtid": null,
"file": "mysql-bin.000001",
"pos": 7352,
"row": 0,
"thread": 49,
"query": null
},
"op": "u",
"ts_ms": 1736069435250,
"transaction": null
}
DELETE 操作(delete events)
最後に、該当レコードを削除します。
DELETE FROM logs
WHERE log_message = '[UPDATED] System boot';
Confluent のトピックでは、DELETE イベントが以下のように記録されています。
{
"before": {
"my_row_id": 1,
"log_message": "[UPDATED] System boot",
"created_at": "2025-01-05T09:28:57Z"
},
"after": null,
"source": {
"version": "2.4.2.Final",
"connector": "mysql",
"name": "debezium_mysql",
"ts_ms": 1736069498000,
"snapshot": "false",
"db": "debezium",
"sequence": null,
"table": "logs",
"server_id": 2287969543,
"gtid": null,
"file": "mysql-bin.000001",
"pos": 7700,
"row": 0,
"thread": 49,
"query": null
},
"op": "d",
"ts_ms": 1736069498076,
"transaction": null
}
""
まとめ
以上が、Azure Database for MySQL から Confluent(Debezium)を用いた CDC の手順と、各種 DML 操作のイベントがどのようにトピックへ記録されるかの例となります。PK(Primary Key)の有無や、複数テーブルにまたがる DML などのパターンによって、Kafka に取り込まれるイベントの内容が異なる点にご注意ください。