0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Azure Database をソースとして Confluent へマネージド Debezium により CDC(変更データキャプチャー)の検証結果

Last updated at Posted at 2025-01-05

概要

本記事では、Azure Database をソースとして、Confluent で提供される Debezium を用いて CDC(Change Data Capture)を実行する手順と、その実行結果についてご紹介します。Azure Database に対する SQL の実行を Azure Data Studio で実施しています。

本記事は、以下のシリーズの一部です。

image.png

引用元:Data in Motion 実現へ:Data Streaming Platform である Confluent の全貌 #Kafka - Qiita

環境準備

Azure Database の環境構築

image.png

image.png

データベースでの CDC の有効化

EXEC sys.sp_cdc_enable_db;
SELECT name, is_cdc_enabled
FROM sys.databases  
WHERE name = 'debezium';

image.png

サンプルのテーブルを作成

IF OBJECT_ID(N'dbo.test_01', N'U') IS NOT NULL
    DROP TABLE [dbo].[test_01];
GO
CREATE TABLE [dbo].[test_01] (
    [Id] INT NOT NULL,
    CONSTRAINT [PK_Customers] PRIMARY KEY ([Id])
);
GO
EXEC sys.sp_cdc_enable_table
    @source_schema = N'dbo',
    @source_name = N'test_01',
    @role_name = NULL;

image.png

本手順が必要な理由は下記記事にて説明しています。

Confluent Cloud での Microsoft SQL Server CDC Source V2 (Debezium) コネクタの作成

image.png

image.png

image.png

動作検証

1. Primary Key を設定しているテーブルでの動作検証

事前準備

まずは Customers テーブルを作成します。

-- 既存テーブルを削除 (必要に応じて)
IF OBJECT_ID(N'dbo.Customers', N'U') IS NOT NULL
    DROP TABLE [dbo].[Customers];
GO

-- Customers テーブル作成
CREATE TABLE [dbo].[Customers] (
    [Id] INT NOT NULL,
    [FirstName] NVARCHAR(255) NOT NULL,
    [LastName]  NVARCHAR(255) NOT NULL,
    [Email]     NVARCHAR(255) NOT NULL,
    [CreatedAt] DATETIME2 NOT NULL CONSTRAINT DF_Customers_CreatedAt DEFAULT (SYSUTCDATETIME()),
    CONSTRAINT [PK__Customers] PRIMARY KEY ([Id])
);
GO

EXEC sys.sp_cdc_enable_table
    @source_schema = N'dbo',
    @source_name = N'Customers',
    @role_name = NULL;

image.png

INSERT 操作(create events)

以下のようにレコードを INSERT します。

INSERT INTO [dbo].[Customers] (id, [FirstName], [LastName], [Email])
VALUES
    (1, N'Anne', N'Kretchmar', N'annek@example.org');
GO

image.png

Confluent のトピックを確認すると、INSERT イベントが以下のように記録されています。

{
  "before": null,
  "after": {
    "Id": 1,
    "FirstName": "Anne",
    "LastName": "Kretchmar",
    "Email": "annek@example.org",
    "CreatedAt": "1736078296932060300"
  },
  "source": {
    "version": "2.4.2.Final",
    "connector": "sqlserver",
    "name": "debezium_sql",
    "ts_ms": 1736079110102,
    "snapshot": "last",
    "db": "debezium",
    "sequence": null,
    "schema": "dbo",
    "table": "Customers",
    "change_lsn": null,
    "commit_lsn": "00000037:00000ab8:0005",
    "event_serial_no": null
  },
  "op": "r",
  "ts_ms": 1736079110117,
  "transaction": null
}

image.png

UPDATE 操作(update events)

次にレコードを UPDATE します。

UPDATE [dbo].[Customers]
   SET [Email] = N'updated_anne@example.org'
 WHERE [Id] = 1;
GO

image.png

Confluent のトピックを確認すると、UPDATE イベントが以下のように記録されています。

{
  "before": {
    "Id": 1,
    "FirstName": "Anne",
    "LastName": "Kretchmar",
    "Email": "annek@example.org",
    "CreatedAt": "1736078296932060300"
  },
  "after": {
    "Id": 1,
    "FirstName": "Anne",
    "LastName": "Kretchmar",
    "Email": "updated_anne@example.org",
    "CreatedAt": "1736078296932060300"
  },
  "source": {
    "version": "2.4.2.Final",
    "connector": "sqlserver",
    "name": "debezium_sql",
    "ts_ms": 1736079342560,
    "snapshot": "false",
    "db": "debezium",
    "sequence": null,
    "schema": "dbo",
    "table": "Customers",
    "change_lsn": "00000037:00000ea0:0005",
    "commit_lsn": "00000037:00000ea0:0007",
    "event_serial_no": 2
  },
  "op": "u",
  "ts_ms": 1736079351813,
  "transaction": null
}

image.png

DELETE 操作(delete events)

次にレコードを DELETE します。

DELETE FROM [dbo].[Customers]
 WHERE [Id] = 1;
GO

image.png

Confluent のトピックを確認すると、DELETE イベントが以下のように記録されています。

{
  "before": {
    "Id": 1,
    "FirstName": "Anne",
    "LastName": "Kretchmar",
    "Email": "updated_anne@example.org",
    "CreatedAt": "1736078296932060300"
  },
  "after": null,
  "source": {
    "version": "2.4.2.Final",
    "connector": "sqlserver",
    "name": "debezium_sql",
    "ts_ms": 1736079397723,
    "snapshot": "false",
    "db": "debezium",
    "sequence": null,
    "schema": "dbo",
    "table": "Customers",
    "change_lsn": "00000037:00000f10:0003",
    "commit_lsn": "00000037:00000f10:0007",
    "event_serial_no": 1
  },
  "op": "d",
  "ts_ms": 1736079401631,
  "transaction": null
}

image.png

""

image.png

Primary Key の値を更新(Primary key updates)

Primary key(id)の値を更新する操作を実施します。

INSERT INTO [dbo].[Customers] (id, [FirstName], [LastName], [Email])
VALUES
    (1, N'Anne', N'Kretchmar', N'annek@example.org');
GO
UPDATE [dbo].[Customers]
   SET [Id] = 10
 WHERE [Id] = 1;
GO

image.png

Confluent のトピックでは、id=1 のレコードを削除→id=10 のレコードを挿入する形でイベントが記録されます。

{
  "before": {
    "Id": 1,
    "FirstName": "Anne",
    "LastName": "Kretchmar",
    "Email": "annek@example.org",
    "CreatedAt": "1736079615696179100"
  },
  "after": null,
  "source": {
    "version": "2.4.2.Final",
    "connector": "sqlserver",
    "name": "debezium_sql",
    "ts_ms": 1736079617827,
    "snapshot": "false",
    "db": "debezium",
    "sequence": null,
    "schema": "dbo",
    "table": "Customers",
    "change_lsn": "00000037:00001130:0007",
    "commit_lsn": "00000037:00001130:000b",
    "event_serial_no": 1
  },
  "op": "d",
  "ts_ms": 1736079622630,
  "transaction": null
}

image.png

""

image.png

{
  "before": null,
  "after": {
    "Id": 10,
    "FirstName": "Anne",
    "LastName": "Kretchmar",
    "Email": "annek@example.org",
    "CreatedAt": "1736079615696179100"
  },
  "source": {
    "version": "2.4.2.Final",
    "connector": "sqlserver",
    "name": "debezium_sql",
    "ts_ms": 1736079617827,
    "snapshot": "false",
    "db": "debezium",
    "sequence": null,
    "schema": "dbo",
    "table": "Customers",
    "change_lsn": "00000037:00001130:0007",
    "commit_lsn": "00000037:00001130:000b",
    "event_serial_no": 2
  },
  "op": "c",
  "ts_ms": 1736079622630,
  "transaction": null
}

image.png

TRUNCATE 操作(truncate events)

CDC を有効化したテーブルでは、TRUNCATE TABLE を実施できないため省略。

TRUNCATE TABLE [dbo].[Customers];
GO

Cannot truncate table 'dbo.Customers' because it is published for replication or enabled for Change Data Capture.

2. 2 つのテーブルを結合した DML の動作検証

事前準備

例として、ordersテーブルとtmp_ordersテーブルを用意します。

IF OBJECT_ID(N'dbo.orders', N'U') IS NOT NULL
    DROP TABLE [dbo].[orders];
GO

CREATE TABLE [dbo].[orders] (
    [order_id]   BIGINT NOT NULL,
    [customer_id] INT NOT NULL,
    [amount]    DECIMAL(10,2) NULL,
    [created_at] DATETIME2 NOT NULL DEFAULT (SYSUTCDATETIME()),
    CONSTRAINT [PK_Orders] PRIMARY KEY ([order_id])
);
GO

IF OBJECT_ID(N'dbo.tmp_orders', N'U') IS NOT NULL
    DROP TABLE [dbo].[tmp_orders];
GO

CREATE TABLE [dbo].[tmp_orders] (
    [order_id]   BIGINT NOT NULL,
    [customer_id] INT NOT NULL,
    [amount]    DECIMAL(10,2) NULL,
    [created_at] DATETIME2 NOT NULL DEFAULT (SYSUTCDATETIME())
);
GO

EXEC sys.sp_cdc_enable_table
    @source_schema = N'dbo',
    @source_name = N'orders',
    @role_name = NULL;

image.png

INSERT INTO [dbo].[orders] (order_id, customer_id, amount)
VALUES
    (1, 1, 0);

INSERT INTO [dbo].[tmp_orders] (order_id, customer_id, amount)
VALUES
    (1, 1, 100),
    (2, 2, 0);

image.png

DELETE 操作 (JOIN 例)

2 つのテーブルを JOIN してordersテーブルを DELETE します。

DELETE o
FROM [dbo].[Orders] AS o
JOIN [dbo].[tmp_orders] AS c
ON o.[order_id] = c.[order_id];
GO

image.png

Confluent のトピックを確認すると、DELETE イベントが以下のように記録されています。

{
  "before": {
    "order_id": 1,
    "customer_id": 1,
    "amount": "AA==",
    "created_at": "1736080086476945900"
  },
  "after": null,
  "source": {
    "version": "2.4.2.Final",
    "connector": "sqlserver",
    "name": "debezium_sql",
    "ts_ms": 1736080090250,
    "snapshot": "false",
    "db": "debezium",
    "sequence": null,
    "schema": "dbo",
    "table": "orders",
    "change_lsn": "00000039:00000340:0003",
    "commit_lsn": "00000039:00000340:0006",
    "event_serial_no": 1
  },
  "op": "d",
  "ts_ms": 1736080093632,
  "transaction": null
}

image.png

""

image.png

INSERT SELECT 操作

tmp_ordersテーブルのデータをもとにordersテーブルへ INSERT を実行します。

INSERT INTO [dbo].[Orders] 
SELECT
    order_id,
    customer_id,
    amount,
    created_at
  FROM [dbo].[tmp_orders];

image.png

{
  "before": null,
  "after": {
    "order_id": 1,
    "customer_id": 1,
    "amount": "JxA=",
    "created_at": "1736080086476945900"
  },
  "source": {
    "version": "2.4.2.Final",
    "connector": "sqlserver",
    "name": "debezium_sql",
    "ts_ms": 1736080195473,
    "snapshot": "false",
    "db": "debezium",
    "sequence": null,
    "schema": "dbo",
    "table": "orders",
    "change_lsn": "00000039:00000400:0005",
    "commit_lsn": "00000039:00000400:0008",
    "event_serial_no": 1
  },
  "op": "c",
  "ts_ms": 1736080199132,
  "transaction": null
}

image.png

MERGE 操作

MERGE 操作の動作を確認するためにデータを用意し、tmp_ordersordersを突き合わせます。

TRUNCATE TABLE [dbo].[tmp_orders];
INSERT [dbo].[tmp_orders] (order_id, customer_id, amount)
VALUES
    (2, 2, 200),
    (3, 3, 300);
MERGE INTO [dbo].[orders] AS c
USING [dbo].[tmp_orders] AS t
  ON (c.order_id = t.order_id) 
WHEN NOT MATCHED THEN
  INSERT (order_id, customer_id, amount)
  VALUES (t.order_id, t.customer_id, t.amount)
WHEN MATCHED THEN
  UPDATE
     SET customer_id = t.customer_id,
         amount  = t.amount;

image.png

Confluent のトピックでは、INSERT イベントと UPDATE イベントが以下のように記録されています。

{
  "before": {
    "order_id": 2,
    "customer_id": 2,
    "amount": "AA==",
    "created_at": "1736080086476945900"
  },
  "after": {
    "order_id": 2,
    "customer_id": 2,
    "amount": "TiA=",
    "created_at": "1736080086476945900"
  },
  "source": {
    "version": "2.4.2.Final",
    "connector": "sqlserver",
    "name": "debezium_sql",
    "ts_ms": 1736081061943,
    "snapshot": "false",
    "db": "debezium",
    "sequence": null,
    "schema": "dbo",
    "table": "orders",
    "change_lsn": "0000003a:000009c0:0007",
    "commit_lsn": "0000003a:000009c0:000a",
    "event_serial_no": 2
  },
  "op": "u",
  "ts_ms": 1736081071634,
  "transaction": null
}

image.png

{
  "before": null,
  "after": {
    "order_id": 3,
    "customer_id": 3,
    "amount": "dTA=",
    "created_at": 1736081061944533200
  },
  "source": {
    "version": "2.4.2.Final",
    "connector": "sqlserver",
    "name": "debezium_sql",
    "ts_ms": 1736081061943,
    "snapshot": "false",
    "db": "debezium",
    "sequence": null,
    "schema": "dbo",
    "table": "orders",
    "change_lsn": "0000003a:000009c0:0008",
    "commit_lsn": "0000003a:000009c0:000a",
    "event_serial_no": 1
  },
  "op": "c",
  "ts_ms": 1736081071634,
  "transaction": null
}

image.png

Primary Key を設定していないテーブルでの動作検証

事前準備

Primary Key を持たないテーブルを作成します。

IF OBJECT_ID(N'dbo.Logs', N'U') IS NOT NULL
    DROP TABLE [dbo].[Logs];
GO

CREATE TABLE [dbo].[Logs] (
    [LogMessage] NVARCHAR(MAX) NOT NULL,
    [CreatedAt]  DATETIME2 NOT NULL CONSTRAINT DF_Logs_CreatedAt DEFAULT (SYSUTCDATETIME())
);
GO

EXEC sys.sp_cdc_enable_table
    @source_schema = N'dbo',
    @source_name = N'Logs',
    @role_name = NULL;
GO

image.png

INSERT 操作(create events)

以下のようにレコードを INSERT します。

INSERT INTO [dbo].[Logs] ([LogMessage])
VALUES
    (N'System boot');
GO

image.png

Confluent のトピックを確認すると、INSERT イベントが以下のように記録されています。

{
  "before": null,
  "after": {
    "LogMessage": "System boot",
    "CreatedAt": 1736080507210824700
  },
  "source": {
    "version": "2.4.2.Final",
    "connector": "sqlserver",
    "name": "debezium_sql",
    "ts_ms": 1736080507220,
    "snapshot": "false",
    "db": "debezium",
    "sequence": null,
    "schema": "dbo",
    "table": "Logs",
    "change_lsn": "0000003a:000000d8:0020",
    "commit_lsn": "0000003a:000000d8:0023",
    "event_serial_no": 1
  },
  "op": "c",
  "ts_ms": 1736080510153,
  "transaction": null
}

image.png

UPDATE 操作(update events)

UPDATE [dbo].[Logs]
   SET [LogMessage] = N'[UPDATED] System boot'
 WHERE [LogMessage] = N'System boot';
GO

image.png

{
  "before": {
    "LogMessage": "System boot",
    "CreatedAt": 1736080507210824700
  },
  "after": {
    "LogMessage": "[UPDATED] System boot",
    "CreatedAt": 1736080507210824700
  },
  "source": {
    "version": "2.4.2.Final",
    "connector": "sqlserver",
    "name": "debezium_sql",
    "ts_ms": 1736080563660,
    "snapshot": "false",
    "db": "debezium",
    "sequence": null,
    "schema": "dbo",
    "table": "Logs",
    "change_lsn": "0000003a:00000460:0004",
    "commit_lsn": "0000003a:00000460:0006",
    "event_serial_no": 2
  },
  "op": "u",
  "ts_ms": 1736080565135,
  "transaction": null
}

image.png

DELETE 操作(delete events)

最後に、該当レコードを削除します。

DELETE FROM [dbo].[Logs]
 WHERE [LogMessage] = N'[UPDATED] System boot';
GO

image.png

Confluent のトピックでは、DELETE イベントが以下のように記録されています。

{
  "before": {
    "LogMessage": "[UPDATED] System boot",
    "CreatedAt": 1736080507210824700
  },
  "after": null,
  "source": {
    "version": "2.4.2.Final",
    "connector": "sqlserver",
    "name": "debezium_sql",
    "ts_ms": 1736080615627,
    "snapshot": "false",
    "db": "debezium",
    "sequence": null,
    "schema": "dbo",
    "table": "Logs",
    "change_lsn": "0000003a:000004c8:0002",
    "commit_lsn": "0000003a:000004c8:0006",
    "event_serial_no": 1
  },
  "op": "d",
  "ts_ms": 1736080620135,
  "transaction": null
}

image.png

""

image.png

まとめ
以上が、Azure Database から Confluent(Debezium)を用いた CDC の手順と、各種 DML 操作のイベントがどのようにトピックへ記録されるかの例となります。PK(Primary Key)の有無や、複数テーブルにまたがる DML などのパターンによって、Kafka に取り込まれるイベントの内容が異なる点にご注意ください。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?