0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Azure Database for MySQLをソースとして Confluent へマネージド Debezium により CDC(変更データキャプチャー)の検証結果

Last updated at Posted at 2025-01-05

概要

本記事では、Azure Database for MySQL をソースとして、Confluent で提供される Debezium を用いて CDC(Change Data Capture)を実行する手順と、その実行結果についてご紹介します。Azure Database for MySQL に対する SQL の実行を Azure Data Studio で実施しています。

本記事は、以下のシリーズの一部です。

image.png

引用元:Data in Motion 実現へ:Data Streaming Platform である Confluent の全貌 #Kafka - Qiita

環境準備

Azure Database for MySQL の環境構築

1. Azure Database for MySQL Flexible Server の作成

image.png

2. binlog_row_imagefullに設定

image.png

3. データベースを作成

image.png

Confluent Cloud での MySQL CDC Source V2 (Debezium) コネクタの作成

image.png

image.png

image.png

image.png

動作検証

1. Primary Key を設定しているテーブルでの動作検証

事前準備

まずは customers テーブルを作成します。

CREATE TABLE customers (
    id INT PRIMARY KEY,
    first_name VARCHAR(255) NOT NULL,
    last_name  VARCHAR(255) NOT NULL,
    email      VARCHAR(255) NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

INSERT 操作(create events)

以下のようにレコードを INSERT します。

INSERT INTO customers (id, first_name, last_name, email)
VALUES
    (1, 'Anne', 'Kretchmar', 'annek@example.org');

image.png

Confluent のトピックを確認すると、INSERT イベントが以下のように記録されています。

{
  "before": null,
  "after": {
    "id": 1,
    "first_name": "Anne",
    "last_name": "Kretchmar",
    "email": "annek@example.org",
    "created_at": "2025-01-05T09:06:23Z"
  },
  "source": {
    "version": "2.4.2.Final",
    "connector": "mysql",
    "name": "debezium_mysql",
    "ts_ms": 1736067983000,
    "snapshot": "false",
    "db": "debezium",
    "sequence": null,
    "table": "customers",
    "server_id": 2287969543,
    "gtid": null,
    "file": "mysql-bin.000001",
    "pos": 1493,
    "row": 0,
    "thread": 49,
    "query": null
  },
  "op": "c",
  "ts_ms": 1736067983350,
  "transaction": null
}

image.png

UPDATE 操作(update events)

次にレコードを UPDATE します。

-- 既存レコードを更新
UPDATE customers
   SET email = 'updated_anne@example.org'
 WHERE id = 1;

image.png

Confluent のトピックを確認すると、UPDATE イベントが以下のように記録されています。

{
  "before": {
    "id": 1,
    "first_name": "Anne",
    "last_name": "Kretchmar",
    "email": "annek@example.org",
    "created_at": "2025-01-05T09:06:23Z"
  },
  "after": {
    "id": 1,
    "first_name": "Anne",
    "last_name": "Kretchmar",
    "email": "updated_anne@example.org",
    "created_at": "2025-01-05T09:06:23Z"
  },
  "source": {
    "version": "2.4.2.Final",
    "connector": "mysql",
    "name": "debezium_mysql",
    "ts_ms": 1736068123000,
    "snapshot": "false",
    "db": "debezium",
    "sequence": null,
    "table": "customers",
    "server_id": 2287969543,
    "gtid": null,
    "file": "mysql-bin.000001",
    "pos": 1844,
    "row": 0,
    "thread": 49,
    "query": null
  },
  "op": "u",
  "ts_ms": 1736068123843,
  "transaction": null
}

image.png

DELETE 操作(delete events)

次にレコードを DELETE します。

DELETE FROM customers
 WHERE id = 1;

image.png

Confluent のトピックを確認すると、DELETE イベントが以下のように記録されています。

{
  "before": {
    "id": 1,
    "first_name": "Anne",
    "last_name": "Kretchmar",
    "email": "updated_anne@example.org",
    "created_at": "2025-01-05T09:06:23Z"
  },
  "after": null,
  "source": {
    "version": "2.4.2.Final",
    "connector": "mysql",
    "name": "debezium_mysql",
    "ts_ms": 1736068158000,
    "snapshot": "false",
    "db": "debezium",
    "sequence": null,
    "table": "customers",
    "server_id": 2287969543,
    "gtid": null,
    "file": "mysql-bin.000001",
    "pos": 2239,
    "row": 0,
    "thread": 49,
    "query": null
  },
  "op": "d",
  "ts_ms": 1736068158834,
  "transaction": null
}

image.png

""

image.png

Primary Key の値を更新(Primary key updates)

Primary key(id)の値を更新する操作を実施します。

-- 通常の INSERT
INSERT INTO customers (id, first_name, last_name, email)
VALUES
    (1, 'Anne', 'Kretchmar', 'annek@example.org');
UPDATE customers
   SET id = 10
 WHERE id = 1;

image.png

Confluent のトピックでは、id=1 のレコードを削除→id=10 のレコードを挿入する形でイベントが記録されます。

{
  "before": {
    "id": 1,
    "first_name": "Anne",
    "last_name": "Kretchmar",
    "email": "annek@example.org",
    "created_at": "2025-01-05T09:11:25Z"
  },
  "after": null,
  "source": {
    "version": "2.4.2.Final",
    "connector": "mysql",
    "name": "debezium_mysql",
    "ts_ms": 1736068290000,
    "snapshot": "false",
    "db": "debezium",
    "sequence": null,
    "table": "customers",
    "server_id": 2287969543,
    "gtid": null,
    "file": "mysql-bin.000001",
    "pos": 2939,
    "row": 0,
    "thread": 49,
    "query": null
  },
  "op": "d",
  "ts_ms": 1736068290066,
  "transaction": null
}

image.png

""

image.png

{
  "before": null,
  "after": {
    "id": 10,
    "first_name": "Anne",
    "last_name": "Kretchmar",
    "email": "annek@example.org",
    "created_at": "2025-01-05T09:11:25Z"
  },
  "source": {
    "version": "2.4.2.Final",
    "connector": "mysql",
    "name": "debezium_mysql",
    "ts_ms": 1736068290000,
    "snapshot": "false",
    "db": "debezium",
    "sequence": null,
    "table": "customers",
    "server_id": 2287969543,
    "gtid": null,
    "file": "mysql-bin.000001",
    "pos": 2939,
    "row": 0,
    "thread": 49,
    "query": null
  },
  "op": "c",
  "ts_ms": 1736068290066,
  "transaction": null
}

image.png

TRUNCATE 操作(truncate events)

次にテーブルを TRUCATE します。

TRUNCATE TABLE customers;

image.png

Confluent のトピックを確認すると、INSERT イベント(opフィールドの値がc)が以下のように記録されており、想定外の動作となりました。

{
  "before": null,
  "after": {
    "id": 2,
    "first_name": "Bob",
    "last_name": "Thomas",
    "email": "bobt@example.org",
    "created_at": "2025-01-05T09:52:58Z"
  },
  "source": {
    "version": "2.4.2.Final",
    "connector": "mysql",
    "name": "debezium_mysql",
    "ts_ms": 1736070778000,
    "snapshot": "false",
    "db": "debezium",
    "sequence": null,
    "table": "customers",
    "server_id": 2287969543,
    "gtid": null,
    "file": "mysql-bin.000001",
    "pos": 8736,
    "row": 0,
    "thread": 49,
    "query": null
  },
  "op": "c",
  "ts_ms": 1736070778015,
  "transaction": null
}

image.png

2. 2 つのテーブルを結合した DML の動作検証

事前準備

例として、ordersテーブルとtmp_ordersテーブルを用意します。

CREATE TABLE orders (
    order_id    INT PRIMARY KEY,
    customer_id INT NOT NULL,
    amount      INT,
    created_at  TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE TABLE tmp_orders (
    order_id    INT PRIMARY KEY,
    customer_id INT NOT NULL,
    amount      INT,
    created_at  TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

image.png

INSERT INTO orders (order_id, customer_id, amount)
VALUES
    (1, 1, 0);

INSERT INTO tmp_orders (order_id, customer_id, amount)
VALUES
    (1, 1, 100),
    (2, 2, 0);

image.png

DELETE 操作 (JOIN 例)

2 つのテーブルを JOIN してordersテーブルを DELETE します。

DELETE orders
  FROM orders
  JOIN tmp_orders
      ON orders.order_id = tmp_orders.order_id;

image.png

{
  "before": {
    "order_id": 1,
    "customer_id": 1,
    "amount": 0,
    "created_at": "2025-01-05T09:18:26Z"
  },
  "after": null,
  "source": {
    "version": "2.4.2.Final",
    "connector": "mysql",
    "name": "debezium_mysql",
    "ts_ms": 1736068788000,
    "snapshot": "false",
    "db": "debezium",
    "sequence": null,
    "table": "orders",
    "server_id": 2287969543,
    "gtid": null,
    "file": "mysql-bin.000001",
    "pos": 5134,
    "row": 0,
    "thread": 49,
    "query": null
  },
  "op": "d",
  "ts_ms": 1736068788429,
  "transaction": null
}

image.png

""

image.png

INSERT SELECT 操作

tmp_ordersテーブルのデータをもとにordersテーブルへ INSERT を実行します。

INSERT INTO orders
SELECT
    order_id,
    customer_id,
    amount,
    created_at
  FROM tmp_orders;

image.png

Confluent のトピックでは、INSERT イベントが以下のように記録されています。

{
  "before": null,
  "after": {
    "order_id": 1,
    "customer_id": 1,
    "amount": 100,
    "created_at": "2025-01-05T09:18:26Z"
  },
  "source": {
    "version": "2.4.2.Final",
    "connector": "mysql",
    "name": "debezium_mysql",
    "ts_ms": 1736068970000,
    "snapshot": "false",
    "db": "debezium",
    "sequence": null,
    "table": "orders",
    "server_id": 2287969543,
    "gtid": null,
    "file": "mysql-bin.000001",
    "pos": 5443,
    "row": 0,
    "thread": 49,
    "query": null
  },
  "op": "c",
  "ts_ms": 1736068970315,
  "transaction": null
}

image.png

MERGE 操作

MySQL では Merge 文を実行できないため省略。

Primary Key を設定していないテーブルでの動作検証

事前準備

Primary Key を持たないテーブルを作成します。

CREATE TABLE logs (
    log_message TEXT,
    created_at  TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

image.png

INSERT 操作(create events)

以下のようにレコードを INSERT します。

INSERT INTO logs (log_message)
VALUES
   ('System boot');

image.png

Confluent のトピックを確認すると、INSERT イベントが以下のように記録されています。

{
  "before": null,
  "after": {
    "my_row_id": 1,
    "log_message": "System boot",
    "created_at": "2025-01-05T09:28:57Z"
  },
  "source": {
    "version": "2.4.2.Final",
    "connector": "mysql",
    "name": "debezium_mysql",
    "ts_ms": 1736069337000,
    "snapshot": "false",
    "db": "debezium",
    "sequence": null,
    "table": "logs",
    "server_id": 2287969543,
    "gtid": null,
    "file": "mysql-bin.000001",
    "pos": 7032,
    "row": 0,
    "thread": 49,
    "query": null
  },
  "op": "c",
  "ts_ms": 1736069337960,
  "transaction": null
}

image.png

UPDATE 操作(update events)

Primary Key が無いテーブルでは、以下のようにレコードを特定して UPDATE します。

UPDATE logs
   SET log_message = '[UPDATED] System boot'
 WHERE log_message = 'System boot';

image.png

Confluent のトピックでは、UPDATE イベントが以下のように記録されています。

{
  "before": {
    "my_row_id": 1,
    "log_message": "System boot",
    "created_at": "2025-01-05T09:28:57Z"
  },
  "after": {
    "my_row_id": 1,
    "log_message": "[UPDATED] System boot",
    "created_at": "2025-01-05T09:28:57Z"
  },
  "source": {
    "version": "2.4.2.Final",
    "connector": "mysql",
    "name": "debezium_mysql",
    "ts_ms": 1736069435000,
    "snapshot": "false",
    "db": "debezium",
    "sequence": null,
    "table": "logs",
    "server_id": 2287969543,
    "gtid": null,
    "file": "mysql-bin.000001",
    "pos": 7352,
    "row": 0,
    "thread": 49,
    "query": null
  },
  "op": "u",
  "ts_ms": 1736069435250,
  "transaction": null
}

image.png

DELETE 操作(delete events)

最後に、該当レコードを削除します。

DELETE FROM logs
 WHERE log_message = '[UPDATED] System boot';

image.png

Confluent のトピックでは、DELETE イベントが以下のように記録されています。

{
  "before": {
    "my_row_id": 1,
    "log_message": "[UPDATED] System boot",
    "created_at": "2025-01-05T09:28:57Z"
  },
  "after": null,
  "source": {
    "version": "2.4.2.Final",
    "connector": "mysql",
    "name": "debezium_mysql",
    "ts_ms": 1736069498000,
    "snapshot": "false",
    "db": "debezium",
    "sequence": null,
    "table": "logs",
    "server_id": 2287969543,
    "gtid": null,
    "file": "mysql-bin.000001",
    "pos": 7700,
    "row": 0,
    "thread": 49,
    "query": null
  },
  "op": "d",
  "ts_ms": 1736069498076,
  "transaction": null
}

image.png

""

image.png

まとめ

以上が、Azure Database for MySQL から Confluent(Debezium)を用いた CDC の手順と、各種 DML 操作のイベントがどのようにトピックへ記録されるかの例となります。PK(Primary Key)の有無や、複数テーブルにまたがる DML などのパターンによって、Kafka に取り込まれるイベントの内容が異なる点にご注意ください。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?