概要
本記事では、Azure Database for PostgreSQL をソースとして、Confluent で提供される Debezium を用いて CDC(Change Data Capture)を実行する手順と、その実行結果についてご紹介します。Azure Database for PostgreSQL に対する SQL の実行を Azure Data Studio で実施しています。
本記事は、以下のシリーズの一部です。
引用元:Data in Motion 実現へ:Data Streaming Platform である Confluent の全貌 #Kafka - Qiita
環境準備
Azure Database for PostgreSQL の環境構築
1. Azure Database for PostgreSQL Flexible Server の作成
2. wal_level
を LOGICAL
に設定
3. スキーマ作成
DROP SCHEMA IF EXISTS debezium_test CASCADE;
CREATE SCHEMA debezium_test;
4. PUBLICATION の作成
CREATE PUBLICATION debezium_publication FOR ALL TABLES;
5. REPLICATION
権限の付与
ALTER ROLE debezium WITH REPLICATION;
Confluent Cloud での Postgres CDC Source V2 (Debezium) コネクタの作成
以下の画面例のように、Confluent Cloud で Debezium コネクタを作成します。
動作検証
1. Primary Key を設定しているテーブルでの動作検証
事前準備
まずは customers
テーブルを作成します。
-- テーブル作成
CREATE TABLE debezium_test.customers (
id BIGINT PRIMARY KEY,
first_name VARCHAR(255) NOT NULL,
last_name VARCHAR(255) NOT NULL,
email VARCHAR(255) NOT NULL,
created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP
);
INSERT 操作(create events)
以下のようにレコードを INSERT します。
-- 通常の INSERT
INSERT INTO debezium_test.customers (id, first_name, last_name, email)
VALUES
(1, 'Anne', 'Kretchmar', 'annek@example.org');
Confluent のトピックを確認すると、INSERT イベントが以下のように記録されています。
{
"before": null,
"after": {
"id": 1,
"first_name": "Anne",
"last_name": "Kretchmar",
"email": "annek@example.org",
"created_at": "2025-01-05T06:49:37.681496Z"
},
"source": {
"version": "2.4.2.Final",
"connector": "postgresql",
"name": "debezium",
"ts_ms": 1736059777682,
"snapshot": "false",
"db": "postgres",
"sequence": "[null,\"67376224\"]",
"schema": "debezium_test",
"table": "customers",
"txId": 1060,
"lsn": 67376224,
"xmin": null
},
"op": "c",
"ts_ms": 1736059777909,
"transaction": null
}
UPDATE 操作(update events)
次にレコードを UPDATE します。
-- 既存レコードの更新
UPDATE debezium_test.customers
SET email = 'updated_anne@example.org'
WHERE id = 1;
Confluent のトピックを確認すると、UPDATE イベントが以下のように記録されています。
{
"before": null,
"after": {
"id": 1,
"first_name": "Anne",
"last_name": "Kretchmar",
"email": "updated_anne@example.org",
"created_at": "2025-01-05T06:49:37.681496Z"
},
"source": {
"version": "2.4.2.Final",
"connector": "postgresql",
"name": "debezium",
"ts_ms": 1736059837649,
"snapshot": "false",
"db": "postgres",
"sequence": "[\"67376544\",\"83896440\"]",
"schema": "debezium_test",
"table": "customers",
"txId": 1075,
"lsn": 83896440,
"xmin": null
},
"op": "u",
"ts_ms": 1736059837776,
"transaction": null
}
DELETE 操作(delete events)
次にレコードを DELETE します。
-- レコード削除
DELETE FROM debezium_test.customers
WHERE id = 1;
Confluent のトピックを確認すると、DELETE イベントが以下のように記録されています。
{
"before": {
"id": 1,
"first_name": "",
"last_name": "",
"email": "",
"created_at": "1970-01-01T00:00:00.000000Z"
},
"after": null,
"source": {
"version": "2.4.2.Final",
"connector": "postgresql",
"name": "debezium",
"ts_ms": 1736059892606,
"snapshot": "false",
"db": "postgres",
"sequence": "[\"83896616\",\"83899008\"]",
"schema": "debezium_test",
"table": "customers",
"txId": 1089,
"lsn": 83899008,
"xmin": null
},
"op": "d",
"ts_ms": 1736059893043,
"transaction": null
}
Primary Key の値を更新(Primary key updates)
Primary key(id)の値を更新する操作を実施します。
-- 通常の INSERT
INSERT INTO debezium_test.customers (id, first_name, last_name, email)
VALUES
(1, 'Anne', 'Kretchmar', 'annek@example.org');
UPDATE debezium_test.customers
SET id = 10
WHERE id = 1;
Confluent のトピックでは、id=1 のレコードを削除→id=10 のレコードを挿入する形でイベントが記録されます。
{
"before": {
"id": 1,
"first_name": "",
"last_name": "",
"email": "",
"created_at": "1970-01-01T00:00:00.000000Z"
},
"after": null,
"source": {
"version": "2.4.2.Final",
"connector": "postgresql",
"name": "debezium",
"ts_ms": 1736059959049,
"snapshot": "false",
"db": "postgres",
"sequence": "[\"83902280\",\"83902280\"]",
"schema": "debezium_test",
"table": "customers",
"txId": 1106,
"lsn": 83902280,
"xmin": null
},
"op": "d",
"ts_ms": 1736059959282,
"transaction": null
}
""
{
"before": null,
"after": {
"id": 10,
"first_name": "Anne",
"last_name": "Kretchmar",
"email": "annek@example.org",
"created_at": "2025-01-05T06:52:36.387167Z"
},
"source": {
"version": "2.4.2.Final",
"connector": "postgresql",
"name": "debezium",
"ts_ms": 1736059959049,
"snapshot": "false",
"db": "postgres",
"sequence": "[\"83902280\",\"83902280\"]",
"schema": "debezium_test",
"table": "customers",
"txId": 1106,
"lsn": 83902280,
"xmin": null
},
"op": "c",
"ts_ms": 1736059959282,
"transaction": null
}
2. 2 つのテーブルを結合した DML の動作検証
事前準備
例として、orders
テーブルと tmp_orders
テーブルを用意します。
-- 例: orders テーブル
CREATE TABLE debezium_test.orders (
order_id BIGINT PRIMARY KEY,
customer_id BIGINT NOT NULL,
amount BIGINT,
created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE debezium_test.tmp_orders (
order_id BIGINT PRIMARY KEY,
customer_id BIGINT NOT NULL,
amount BIGINT,
created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP
);
INSERT INTO debezium_test.orders (order_id, customer_id, amount)
VALUES
(1, 1, 0);
INSERT INTO debezium_test.tmp_orders (order_id, customer_id, amount)
VALUES
(1, 1, 100),
(2, 2, 0);
DELETE 操作 (JOIN 例)
2 つのテーブルを JOIN して orders
テーブルを DELETE します。
DELETE FROM debezium_test.orders o
USING debezium_test.tmp_orders c
WHERE o.order_id = c.order_id;
Confluent のトピックを見ると、DELETE イベントが以下のように記録されています。
{
"before": {
"order_id": 1,
"customer_id": 0,
"amount": null,
"created_at": "1970-01-01T00:00:00.000000Z"
},
"after": null,
"source": {
"version": "2.4.2.Final",
"connector": "postgresql",
"name": "debezium",
"ts_ms": 1736060074117,
"snapshot": "false",
"db": "postgres",
"sequence": "[\"83997784\",\"83998040\"]",
"schema": "debezium_test",
"table": "orders",
"txId": 1137,
"lsn": 83998040,
"xmin": null
},
"op": "d",
"ts_ms": 1736060074330,
"transaction": null
}
""
INSERT SELECT 操作
tmp_orders
テーブルのデータをもとに orders
テーブルへ INSERT を実行します。
INSERT INTO debezium_test.orders
SELECT
order_id,
customer_id,
amount,
created_at
FROM debezium_test.tmp_orders;
Confluent のトピックでは、INSERT イベントが以下のように記録されています。
{
"before": null,
"after": {
"order_id": 1,
"customer_id": 1,
"amount": 100,
"created_at": "2025-01-05T06:54:29.435440Z"
},
"source": {
"version": "2.4.2.Final",
"connector": "postgresql",
"name": "debezium",
"ts_ms": 1736060196651,
"snapshot": "false",
"db": "postgres",
"sequence": "[\"83998160\",\"100667016\"]",
"schema": "debezium_test",
"table": "orders",
"txId": 1167,
"lsn": 100667016,
"xmin": null
},
"op": "c",
"ts_ms": 1736060196778,
"transaction": null
}
MERGE 操作
MERGE
操作の動作を確認するためにデータを用意し、tmp_orders
と orders
を突き合わせます。
TRUNCATE TABLE debezium_test.tmp_orders;
INSERT INTO debezium_test.tmp_orders (order_id, customer_id, amount)
VALUES
(2, 2, 200),
(3, 3, 300);
MERGE INTO debezium_test.orders AS target
USING debezium_test.tmp_orders AS source
ON (target.customer_id = source.customer_id)
WHEN MATCHED THEN
UPDATE SET
amount = source.amount,
customer_id = source.customer_id,
created_at = CURRENT_TIMESTAMP
WHEN NOT MATCHED THEN
INSERT (order_id, customer_id, amount)
VALUES (source.order_id, source.customer_id, source.amount);
Confluent のトピックでは、INSERT イベントと UPDATE イベントが以下のように記録されています。
{
"before": null,
"after": {
"order_id": 3,
"customer_id": 3,
"amount": 300,
"created_at": "2025-01-05T06:58:02.160738Z"
},
"source": {
"version": "2.4.2.Final",
"connector": "postgresql",
"name": "debezium",
"ts_ms": 1736060282161,
"snapshot": "false",
"db": "postgres",
"sequence": "[\"100674592\",\"100674696\"]",
"schema": "debezium_test",
"table": "orders",
"txId": 1190,
"lsn": 100674696,
"xmin": null
},
"op": "c",
"ts_ms": 1736060282378,
"transaction": null
}
{
"before": null,
"after": {
"order_id": 2,
"customer_id": 2,
"amount": 200,
"created_at": "2025-01-05T06:58:02.160738Z"
},
"source": {
"version": "2.4.2.Final",
"connector": "postgresql",
"name": "debezium",
"ts_ms": 1736060282161,
"snapshot": "false",
"db": "postgres",
"sequence": "[\"100674592\",\"100674592\"]",
"schema": "debezium_test",
"table": "orders",
"txId": 1190,
"lsn": 100674592,
"xmin": null
},
"op": "u",
"ts_ms": 1736060282378,
"transaction": null
}
3. Primary Key を設定していないテーブルでの動作検証
事前準備
Primary Key を持たないテーブルを作成します。
-- PK なしテーブル
CREATE TABLE debezium_test.logs (
log_message TEXT,
created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP
);
PK が設定されていないテーブルに対しては、UPDATE
を実行しようとすると以下のエラーが発生する場合があります。
cannot update table "logs" because it does not have a replica identity and publishes updates
このエラーを回避するには、REPLICA IDENTITY
を FULL
に設定します。ただし、INSERT 操作のみであれば必須ではありません。
ALTER TABLE debezium_test.logs REPLICA IDENTITY FULL;
INSERT 操作(create events)
INSERT INTO debezium_test.logs (log_message)
VALUES
('System boot');
Confluent のトピックを確認すると、INSERT イベントが以下のように記録されています。
{
"before": null,
"after": {
"log_message": "System boot",
"created_at": "2025-01-05T07:00:54.979111Z"
},
"source": {
"version": "2.4.2.Final",
"connector": "postgresql",
"name": "debezium",
"ts_ms": 1736060454979,
"snapshot": "false",
"db": "postgres",
"sequence": "[\"100674896\",\"117448792\"]",
"schema": "debezium_test",
"table": "logs",
"txId": 1232,
"lsn": 117448792,
"xmin": null
},
"op": "c",
"ts_ms": 1736060455160,
"transaction": null
}
UPDATE 操作(update events)
Primary Key が無いテーブルでは、以下のようにレコードを特定して UPDATE します。
UPDATE debezium_test.logs
SET log_message = '[UPDATED] System boot'
WHERE log_message = 'System boot';
Confluent のトピックでは、UPDATE イベントが以下のように記録されています。
{
"before": {
"log_message": "System boot",
"created_at": "2025-01-05T07:00:54.979111Z"
},
"after": {
"log_message": "[UPDATED] System boot",
"created_at": "2025-01-05T07:00:54.979111Z"
},
"source": {
"version": "2.4.2.Final",
"connector": "postgresql",
"name": "debezium",
"ts_ms": 1736060585737,
"snapshot": "false",
"db": "postgres",
"sequence": "[\"117448920\",\"117456544\"]",
"schema": "debezium_test",
"table": "logs",
"txId": 1263,
"lsn": 117456544,
"xmin": null
},
"op": "u",
"ts_ms": 1736060585803,
"transaction": null
}
DELETE 操作(delete events)
最後に、該当レコードを削除します。
DELETE FROM debezium_test.logs
WHERE log_message = '[UPDATED] System boot';
Confluent のトピックでは、DELETE イベントが以下のように記録されています。
{
"before": {
"log_message": "[UPDATED] System boot",
"created_at": "2025-01-05T07:00:54.979111Z"
},
"after": null,
"source": {
"version": "2.4.2.Final",
"connector": "postgresql",
"name": "debezium",
"ts_ms": 1736060654552,
"snapshot": "false",
"db": "postgres",
"sequence": "[\"117456720\",\"117460176\"]",
"schema": "debezium_test",
"table": "logs",
"txId": 1280,
"lsn": 117460176,
"xmin": null
},
"op": "d",
"ts_ms": 1736060654671,
"transaction": null
}
まとめ
以上が、Azure Database for PostgreSQL から Confluent(Debezium)を用いた CDC の手順と、各種 DML 操作のイベントがどのようにトピックへ記録されるかの例となります。PK(Primary Key)の有無や、複数テーブルにまたがる DML などのパターンによって、Kafka に取り込まれるイベントの内容が異なる点にご注意ください。