概要
本記事では、Azure Database をソースとして、Confluent で提供される Debezium を用いて CDC(Change Data Capture)を実行する手順と、その実行結果についてご紹介します。Azure Database に対する SQL の実行を Azure Data Studio で実施しています。
本記事は、以下のシリーズの一部です。
引用元:Data in Motion 実現へ:Data Streaming Platform である Confluent の全貌 #Kafka - Qiita
環境準備
Azure Database の環境構築
データベースでの CDC の有効化
EXEC sys.sp_cdc_enable_db;
SELECT name, is_cdc_enabled
FROM sys.databases
WHERE name = 'debezium';
サンプルのテーブルを作成
IF OBJECT_ID(N'dbo.test_01', N'U') IS NOT NULL
DROP TABLE [dbo].[test_01];
GO
CREATE TABLE [dbo].[test_01] (
[Id] INT NOT NULL,
CONSTRAINT [PK_Customers] PRIMARY KEY ([Id])
);
GO
EXEC sys.sp_cdc_enable_table
@source_schema = N'dbo',
@source_name = N'test_01',
@role_name = NULL;
本手順が必要な理由は下記記事にて説明しています。
Confluent Cloud での Microsoft SQL Server CDC Source V2 (Debezium) コネクタの作成
動作検証
1. Primary Key を設定しているテーブルでの動作検証
事前準備
まずは Customers テーブルを作成します。
-- 既存テーブルを削除 (必要に応じて)
IF OBJECT_ID(N'dbo.Customers', N'U') IS NOT NULL
DROP TABLE [dbo].[Customers];
GO
-- Customers テーブル作成
CREATE TABLE [dbo].[Customers] (
[Id] INT NOT NULL,
[FirstName] NVARCHAR(255) NOT NULL,
[LastName] NVARCHAR(255) NOT NULL,
[Email] NVARCHAR(255) NOT NULL,
[CreatedAt] DATETIME2 NOT NULL CONSTRAINT DF_Customers_CreatedAt DEFAULT (SYSUTCDATETIME()),
CONSTRAINT [PK__Customers] PRIMARY KEY ([Id])
);
GO
EXEC sys.sp_cdc_enable_table
@source_schema = N'dbo',
@source_name = N'Customers',
@role_name = NULL;
INSERT 操作(create events)
以下のようにレコードを INSERT します。
INSERT INTO [dbo].[Customers] (id, [FirstName], [LastName], [Email])
VALUES
(1, N'Anne', N'Kretchmar', N'annek@example.org');
GO
Confluent のトピックを確認すると、INSERT イベントが以下のように記録されています。
{
"before": null,
"after": {
"Id": 1,
"FirstName": "Anne",
"LastName": "Kretchmar",
"Email": "annek@example.org",
"CreatedAt": "1736078296932060300"
},
"source": {
"version": "2.4.2.Final",
"connector": "sqlserver",
"name": "debezium_sql",
"ts_ms": 1736079110102,
"snapshot": "last",
"db": "debezium",
"sequence": null,
"schema": "dbo",
"table": "Customers",
"change_lsn": null,
"commit_lsn": "00000037:00000ab8:0005",
"event_serial_no": null
},
"op": "r",
"ts_ms": 1736079110117,
"transaction": null
}
UPDATE 操作(update events)
次にレコードを UPDATE します。
UPDATE [dbo].[Customers]
SET [Email] = N'updated_anne@example.org'
WHERE [Id] = 1;
GO
Confluent のトピックを確認すると、UPDATE イベントが以下のように記録されています。
{
"before": {
"Id": 1,
"FirstName": "Anne",
"LastName": "Kretchmar",
"Email": "annek@example.org",
"CreatedAt": "1736078296932060300"
},
"after": {
"Id": 1,
"FirstName": "Anne",
"LastName": "Kretchmar",
"Email": "updated_anne@example.org",
"CreatedAt": "1736078296932060300"
},
"source": {
"version": "2.4.2.Final",
"connector": "sqlserver",
"name": "debezium_sql",
"ts_ms": 1736079342560,
"snapshot": "false",
"db": "debezium",
"sequence": null,
"schema": "dbo",
"table": "Customers",
"change_lsn": "00000037:00000ea0:0005",
"commit_lsn": "00000037:00000ea0:0007",
"event_serial_no": 2
},
"op": "u",
"ts_ms": 1736079351813,
"transaction": null
}
DELETE 操作(delete events)
次にレコードを DELETE します。
DELETE FROM [dbo].[Customers]
WHERE [Id] = 1;
GO
Confluent のトピックを確認すると、DELETE イベントが以下のように記録されています。
{
"before": {
"Id": 1,
"FirstName": "Anne",
"LastName": "Kretchmar",
"Email": "updated_anne@example.org",
"CreatedAt": "1736078296932060300"
},
"after": null,
"source": {
"version": "2.4.2.Final",
"connector": "sqlserver",
"name": "debezium_sql",
"ts_ms": 1736079397723,
"snapshot": "false",
"db": "debezium",
"sequence": null,
"schema": "dbo",
"table": "Customers",
"change_lsn": "00000037:00000f10:0003",
"commit_lsn": "00000037:00000f10:0007",
"event_serial_no": 1
},
"op": "d",
"ts_ms": 1736079401631,
"transaction": null
}
""
Primary Key の値を更新(Primary key updates)
Primary key(id)の値を更新する操作を実施します。
INSERT INTO [dbo].[Customers] (id, [FirstName], [LastName], [Email])
VALUES
(1, N'Anne', N'Kretchmar', N'annek@example.org');
GO
UPDATE [dbo].[Customers]
SET [Id] = 10
WHERE [Id] = 1;
GO
Confluent のトピックでは、id=1 のレコードを削除→id=10 のレコードを挿入する形でイベントが記録されます。
{
"before": {
"Id": 1,
"FirstName": "Anne",
"LastName": "Kretchmar",
"Email": "annek@example.org",
"CreatedAt": "1736079615696179100"
},
"after": null,
"source": {
"version": "2.4.2.Final",
"connector": "sqlserver",
"name": "debezium_sql",
"ts_ms": 1736079617827,
"snapshot": "false",
"db": "debezium",
"sequence": null,
"schema": "dbo",
"table": "Customers",
"change_lsn": "00000037:00001130:0007",
"commit_lsn": "00000037:00001130:000b",
"event_serial_no": 1
},
"op": "d",
"ts_ms": 1736079622630,
"transaction": null
}
""
{
"before": null,
"after": {
"Id": 10,
"FirstName": "Anne",
"LastName": "Kretchmar",
"Email": "annek@example.org",
"CreatedAt": "1736079615696179100"
},
"source": {
"version": "2.4.2.Final",
"connector": "sqlserver",
"name": "debezium_sql",
"ts_ms": 1736079617827,
"snapshot": "false",
"db": "debezium",
"sequence": null,
"schema": "dbo",
"table": "Customers",
"change_lsn": "00000037:00001130:0007",
"commit_lsn": "00000037:00001130:000b",
"event_serial_no": 2
},
"op": "c",
"ts_ms": 1736079622630,
"transaction": null
}
TRUNCATE 操作(truncate events)
CDC を有効化したテーブルでは、TRUNCATE TABLE を実施できないため省略。
TRUNCATE TABLE [dbo].[Customers];
GO
Cannot truncate table 'dbo.Customers' because it is published for replication or enabled for Change Data Capture.
2. 2 つのテーブルを結合した DML の動作検証
事前準備
例として、orders
テーブルとtmp_orders
テーブルを用意します。
IF OBJECT_ID(N'dbo.orders', N'U') IS NOT NULL
DROP TABLE [dbo].[orders];
GO
CREATE TABLE [dbo].[orders] (
[order_id] BIGINT NOT NULL,
[customer_id] INT NOT NULL,
[amount] DECIMAL(10,2) NULL,
[created_at] DATETIME2 NOT NULL DEFAULT (SYSUTCDATETIME()),
CONSTRAINT [PK_Orders] PRIMARY KEY ([order_id])
);
GO
IF OBJECT_ID(N'dbo.tmp_orders', N'U') IS NOT NULL
DROP TABLE [dbo].[tmp_orders];
GO
CREATE TABLE [dbo].[tmp_orders] (
[order_id] BIGINT NOT NULL,
[customer_id] INT NOT NULL,
[amount] DECIMAL(10,2) NULL,
[created_at] DATETIME2 NOT NULL DEFAULT (SYSUTCDATETIME())
);
GO
EXEC sys.sp_cdc_enable_table
@source_schema = N'dbo',
@source_name = N'orders',
@role_name = NULL;
INSERT INTO [dbo].[orders] (order_id, customer_id, amount)
VALUES
(1, 1, 0);
INSERT INTO [dbo].[tmp_orders] (order_id, customer_id, amount)
VALUES
(1, 1, 100),
(2, 2, 0);
DELETE 操作 (JOIN 例)
2 つのテーブルを JOIN してordersテーブルを DELETE します。
DELETE o
FROM [dbo].[Orders] AS o
JOIN [dbo].[tmp_orders] AS c
ON o.[order_id] = c.[order_id];
GO
Confluent のトピックを確認すると、DELETE イベントが以下のように記録されています。
{
"before": {
"order_id": 1,
"customer_id": 1,
"amount": "AA==",
"created_at": "1736080086476945900"
},
"after": null,
"source": {
"version": "2.4.2.Final",
"connector": "sqlserver",
"name": "debezium_sql",
"ts_ms": 1736080090250,
"snapshot": "false",
"db": "debezium",
"sequence": null,
"schema": "dbo",
"table": "orders",
"change_lsn": "00000039:00000340:0003",
"commit_lsn": "00000039:00000340:0006",
"event_serial_no": 1
},
"op": "d",
"ts_ms": 1736080093632,
"transaction": null
}
""
INSERT SELECT 操作
tmp_orders
テーブルのデータをもとにordersテーブルへ INSERT を実行します。
INSERT INTO [dbo].[Orders]
SELECT
order_id,
customer_id,
amount,
created_at
FROM [dbo].[tmp_orders];
{
"before": null,
"after": {
"order_id": 1,
"customer_id": 1,
"amount": "JxA=",
"created_at": "1736080086476945900"
},
"source": {
"version": "2.4.2.Final",
"connector": "sqlserver",
"name": "debezium_sql",
"ts_ms": 1736080195473,
"snapshot": "false",
"db": "debezium",
"sequence": null,
"schema": "dbo",
"table": "orders",
"change_lsn": "00000039:00000400:0005",
"commit_lsn": "00000039:00000400:0008",
"event_serial_no": 1
},
"op": "c",
"ts_ms": 1736080199132,
"transaction": null
}
MERGE 操作
MERGE 操作の動作を確認するためにデータを用意し、tmp_orders
とorders
を突き合わせます。
TRUNCATE TABLE [dbo].[tmp_orders];
INSERT [dbo].[tmp_orders] (order_id, customer_id, amount)
VALUES
(2, 2, 200),
(3, 3, 300);
MERGE INTO [dbo].[orders] AS c
USING [dbo].[tmp_orders] AS t
ON (c.order_id = t.order_id)
WHEN NOT MATCHED THEN
INSERT (order_id, customer_id, amount)
VALUES (t.order_id, t.customer_id, t.amount)
WHEN MATCHED THEN
UPDATE
SET customer_id = t.customer_id,
amount = t.amount;
Confluent のトピックでは、INSERT イベントと UPDATE イベントが以下のように記録されています。
{
"before": {
"order_id": 2,
"customer_id": 2,
"amount": "AA==",
"created_at": "1736080086476945900"
},
"after": {
"order_id": 2,
"customer_id": 2,
"amount": "TiA=",
"created_at": "1736080086476945900"
},
"source": {
"version": "2.4.2.Final",
"connector": "sqlserver",
"name": "debezium_sql",
"ts_ms": 1736081061943,
"snapshot": "false",
"db": "debezium",
"sequence": null,
"schema": "dbo",
"table": "orders",
"change_lsn": "0000003a:000009c0:0007",
"commit_lsn": "0000003a:000009c0:000a",
"event_serial_no": 2
},
"op": "u",
"ts_ms": 1736081071634,
"transaction": null
}
{
"before": null,
"after": {
"order_id": 3,
"customer_id": 3,
"amount": "dTA=",
"created_at": 1736081061944533200
},
"source": {
"version": "2.4.2.Final",
"connector": "sqlserver",
"name": "debezium_sql",
"ts_ms": 1736081061943,
"snapshot": "false",
"db": "debezium",
"sequence": null,
"schema": "dbo",
"table": "orders",
"change_lsn": "0000003a:000009c0:0008",
"commit_lsn": "0000003a:000009c0:000a",
"event_serial_no": 1
},
"op": "c",
"ts_ms": 1736081071634,
"transaction": null
}
Primary Key を設定していないテーブルでの動作検証
事前準備
Primary Key を持たないテーブルを作成します。
IF OBJECT_ID(N'dbo.Logs', N'U') IS NOT NULL
DROP TABLE [dbo].[Logs];
GO
CREATE TABLE [dbo].[Logs] (
[LogMessage] NVARCHAR(MAX) NOT NULL,
[CreatedAt] DATETIME2 NOT NULL CONSTRAINT DF_Logs_CreatedAt DEFAULT (SYSUTCDATETIME())
);
GO
EXEC sys.sp_cdc_enable_table
@source_schema = N'dbo',
@source_name = N'Logs',
@role_name = NULL;
GO
INSERT 操作(create events)
以下のようにレコードを INSERT します。
INSERT INTO [dbo].[Logs] ([LogMessage])
VALUES
(N'System boot');
GO
Confluent のトピックを確認すると、INSERT イベントが以下のように記録されています。
{
"before": null,
"after": {
"LogMessage": "System boot",
"CreatedAt": 1736080507210824700
},
"source": {
"version": "2.4.2.Final",
"connector": "sqlserver",
"name": "debezium_sql",
"ts_ms": 1736080507220,
"snapshot": "false",
"db": "debezium",
"sequence": null,
"schema": "dbo",
"table": "Logs",
"change_lsn": "0000003a:000000d8:0020",
"commit_lsn": "0000003a:000000d8:0023",
"event_serial_no": 1
},
"op": "c",
"ts_ms": 1736080510153,
"transaction": null
}
UPDATE 操作(update events)
UPDATE [dbo].[Logs]
SET [LogMessage] = N'[UPDATED] System boot'
WHERE [LogMessage] = N'System boot';
GO
{
"before": {
"LogMessage": "System boot",
"CreatedAt": 1736080507210824700
},
"after": {
"LogMessage": "[UPDATED] System boot",
"CreatedAt": 1736080507210824700
},
"source": {
"version": "2.4.2.Final",
"connector": "sqlserver",
"name": "debezium_sql",
"ts_ms": 1736080563660,
"snapshot": "false",
"db": "debezium",
"sequence": null,
"schema": "dbo",
"table": "Logs",
"change_lsn": "0000003a:00000460:0004",
"commit_lsn": "0000003a:00000460:0006",
"event_serial_no": 2
},
"op": "u",
"ts_ms": 1736080565135,
"transaction": null
}
DELETE 操作(delete events)
最後に、該当レコードを削除します。
DELETE FROM [dbo].[Logs]
WHERE [LogMessage] = N'[UPDATED] System boot';
GO
Confluent のトピックでは、DELETE イベントが以下のように記録されています。
{
"before": {
"LogMessage": "[UPDATED] System boot",
"CreatedAt": 1736080507210824700
},
"after": null,
"source": {
"version": "2.4.2.Final",
"connector": "sqlserver",
"name": "debezium_sql",
"ts_ms": 1736080615627,
"snapshot": "false",
"db": "debezium",
"sequence": null,
"schema": "dbo",
"table": "Logs",
"change_lsn": "0000003a:000004c8:0002",
"commit_lsn": "0000003a:000004c8:0006",
"event_serial_no": 1
},
"op": "d",
"ts_ms": 1736080620135,
"transaction": null
}
""
まとめ
以上が、Azure Database から Confluent(Debezium)を用いた CDC の手順と、各種 DML 操作のイベントがどのようにトピックへ記録されるかの例となります。PK(Primary Key)の有無や、複数テーブルにまたがる DML などのパターンによって、Kafka に取り込まれるイベントの内容が異なる点にご注意ください。