0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Azure Database for PostgreSQL をソースとして Confluent へマネージド Debezium により CDC(変更データキャプチャー)の検証結果

Last updated at Posted at 2025-01-05

概要

本記事では、Azure Database for PostgreSQL をソースとして、Confluent で提供される Debezium を用いて CDC(Change Data Capture)を実行する手順と、その実行結果についてご紹介します。Azure Database for PostgreSQL に対する SQL の実行を Azure Data Studio で実施しています。

本記事は、以下のシリーズの一部です。

image.png

引用元:Data in Motion 実現へ:Data Streaming Platform である Confluent の全貌 #Kafka - Qiita

環境準備

Azure Database for PostgreSQL の環境構築

1. Azure Database for PostgreSQL Flexible Server の作成

image.png

2. wal_levelLOGICAL に設定

image.png

3. スキーマ作成

DROP SCHEMA IF EXISTS debezium_test CASCADE;
CREATE SCHEMA debezium_test;

image.png

4. PUBLICATION の作成

CREATE PUBLICATION debezium_publication FOR ALL TABLES;

image.png

5. REPLICATION 権限の付与

ALTER ROLE debezium WITH REPLICATION;

image.png

Confluent Cloud での Postgres CDC Source V2 (Debezium) コネクタの作成

以下の画面例のように、Confluent Cloud で Debezium コネクタを作成します。

image.png

image.png

image.png

image.png


動作検証

1. Primary Key を設定しているテーブルでの動作検証

事前準備

まずは customers テーブルを作成します。

-- テーブル作成
CREATE TABLE debezium_test.customers (
    id         BIGINT PRIMARY KEY,
    first_name VARCHAR(255) NOT NULL,
    last_name  VARCHAR(255) NOT NULL,
    email      VARCHAR(255) NOT NULL,
    created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP
);

image.png

INSERT 操作(create events)

以下のようにレコードを INSERT します。

-- 通常の INSERT
INSERT INTO debezium_test.customers (id, first_name, last_name, email)
VALUES
    (1, 'Anne', 'Kretchmar', 'annek@example.org');

image.png

Confluent のトピックを確認すると、INSERT イベントが以下のように記録されています。

{
  "before": null,
  "after": {
    "id": 1,
    "first_name": "Anne",
    "last_name": "Kretchmar",
    "email": "annek@example.org",
    "created_at": "2025-01-05T06:49:37.681496Z"
  },
  "source": {
    "version": "2.4.2.Final",
    "connector": "postgresql",
    "name": "debezium",
    "ts_ms": 1736059777682,
    "snapshot": "false",
    "db": "postgres",
    "sequence": "[null,\"67376224\"]",
    "schema": "debezium_test",
    "table": "customers",
    "txId": 1060,
    "lsn": 67376224,
    "xmin": null
  },
  "op": "c",
  "ts_ms": 1736059777909,
  "transaction": null
}

image.png

UPDATE 操作(update events)

次にレコードを UPDATE します。

-- 既存レコードの更新
UPDATE debezium_test.customers
   SET email = 'updated_anne@example.org'
 WHERE id = 1;

Confluent のトピックを確認すると、UPDATE イベントが以下のように記録されています。

{
  "before": null,
  "after": {
    "id": 1,
    "first_name": "Anne",
    "last_name": "Kretchmar",
    "email": "updated_anne@example.org",
    "created_at": "2025-01-05T06:49:37.681496Z"
  },
  "source": {
    "version": "2.4.2.Final",
    "connector": "postgresql",
    "name": "debezium",
    "ts_ms": 1736059837649,
    "snapshot": "false",
    "db": "postgres",
    "sequence": "[\"67376544\",\"83896440\"]",
    "schema": "debezium_test",
    "table": "customers",
    "txId": 1075,
    "lsn": 83896440,
    "xmin": null
  },
  "op": "u",
  "ts_ms": 1736059837776,
  "transaction": null
}

image.png

DELETE 操作(delete events)

次にレコードを DELETE します。

-- レコード削除
DELETE FROM debezium_test.customers
 WHERE id = 1;

image.png

Confluent のトピックを確認すると、DELETE イベントが以下のように記録されています。

{
  "before": {
    "id": 1,
    "first_name": "",
    "last_name": "",
    "email": "",
    "created_at": "1970-01-01T00:00:00.000000Z"
  },
  "after": null,
  "source": {
    "version": "2.4.2.Final",
    "connector": "postgresql",
    "name": "debezium",
    "ts_ms": 1736059892606,
    "snapshot": "false",
    "db": "postgres",
    "sequence": "[\"83896616\",\"83899008\"]",
    "schema": "debezium_test",
    "table": "customers",
    "txId": 1089,
    "lsn": 83899008,
    "xmin": null
  },
  "op": "d",
  "ts_ms": 1736059893043,
  "transaction": null
}

image.png

Primary Key の値を更新(Primary key updates)

Primary key(id)の値を更新する操作を実施します。

-- 通常の INSERT
INSERT INTO debezium_test.customers (id, first_name, last_name, email)
VALUES
    (1, 'Anne', 'Kretchmar', 'annek@example.org');

UPDATE debezium_test.customers
   SET id = 10
 WHERE id = 1;

image.png

Confluent のトピックでは、id=1 のレコードを削除→id=10 のレコードを挿入する形でイベントが記録されます。

{
  "before": {
    "id": 1,
    "first_name": "",
    "last_name": "",
    "email": "",
    "created_at": "1970-01-01T00:00:00.000000Z"
  },
  "after": null,
  "source": {
    "version": "2.4.2.Final",
    "connector": "postgresql",
    "name": "debezium",
    "ts_ms": 1736059959049,
    "snapshot": "false",
    "db": "postgres",
    "sequence": "[\"83902280\",\"83902280\"]",
    "schema": "debezium_test",
    "table": "customers",
    "txId": 1106,
    "lsn": 83902280,
    "xmin": null
  },
  "op": "d",
  "ts_ms": 1736059959282,
  "transaction": null
}

image.png

""

image.png

{
  "before": null,
  "after": {
    "id": 10,
    "first_name": "Anne",
    "last_name": "Kretchmar",
    "email": "annek@example.org",
    "created_at": "2025-01-05T06:52:36.387167Z"
  },
  "source": {
    "version": "2.4.2.Final",
    "connector": "postgresql",
    "name": "debezium",
    "ts_ms": 1736059959049,
    "snapshot": "false",
    "db": "postgres",
    "sequence": "[\"83902280\",\"83902280\"]",
    "schema": "debezium_test",
    "table": "customers",
    "txId": 1106,
    "lsn": 83902280,
    "xmin": null
  },
  "op": "c",
  "ts_ms": 1736059959282,
  "transaction": null
}

image.png

2. 2 つのテーブルを結合した DML の動作検証

事前準備

例として、orders テーブルと tmp_orders テーブルを用意します。

-- 例: orders テーブル
CREATE TABLE debezium_test.orders (
    order_id    BIGINT PRIMARY KEY,
    customer_id BIGINT NOT NULL,
    amount      BIGINT,
    created_at  TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP
);

CREATE TABLE debezium_test.tmp_orders (
    order_id    BIGINT PRIMARY KEY,
    customer_id BIGINT NOT NULL,
    amount      BIGINT,
    created_at  TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP
);

image.png

INSERT INTO debezium_test.orders (order_id, customer_id, amount)
VALUES
    (1, 1, 0);

INSERT INTO debezium_test.tmp_orders (order_id, customer_id, amount)
VALUES
    (1, 1, 100),
    (2, 2, 0);

image.png

DELETE 操作 (JOIN 例)

2 つのテーブルを JOIN して orders テーブルを DELETE します。

DELETE FROM debezium_test.orders o
 USING debezium_test.tmp_orders c
 WHERE o.order_id = c.order_id;

image.png

Confluent のトピックを見ると、DELETE イベントが以下のように記録されています。

{
  "before": {
    "order_id": 1,
    "customer_id": 0,
    "amount": null,
    "created_at": "1970-01-01T00:00:00.000000Z"
  },
  "after": null,
  "source": {
    "version": "2.4.2.Final",
    "connector": "postgresql",
    "name": "debezium",
    "ts_ms": 1736060074117,
    "snapshot": "false",
    "db": "postgres",
    "sequence": "[\"83997784\",\"83998040\"]",
    "schema": "debezium_test",
    "table": "orders",
    "txId": 1137,
    "lsn": 83998040,
    "xmin": null
  },
  "op": "d",
  "ts_ms": 1736060074330,
  "transaction": null
}

image.png

""

image.png

INSERT SELECT 操作

tmp_orders テーブルのデータをもとに orders テーブルへ INSERT を実行します。

INSERT INTO debezium_test.orders
SELECT
    order_id,
    customer_id,
    amount,
    created_at
  FROM debezium_test.tmp_orders;

image.png

Confluent のトピックでは、INSERT イベントが以下のように記録されています。

{
  "before": null,
  "after": {
    "order_id": 1,
    "customer_id": 1,
    "amount": 100,
    "created_at": "2025-01-05T06:54:29.435440Z"
  },
  "source": {
    "version": "2.4.2.Final",
    "connector": "postgresql",
    "name": "debezium",
    "ts_ms": 1736060196651,
    "snapshot": "false",
    "db": "postgres",
    "sequence": "[\"83998160\",\"100667016\"]",
    "schema": "debezium_test",
    "table": "orders",
    "txId": 1167,
    "lsn": 100667016,
    "xmin": null
  },
  "op": "c",
  "ts_ms": 1736060196778,
  "transaction": null
}

image.png

MERGE 操作

MERGE 操作の動作を確認するためにデータを用意し、tmp_ordersorders を突き合わせます。

TRUNCATE TABLE debezium_test.tmp_orders;
INSERT INTO debezium_test.tmp_orders (order_id, customer_id, amount)
VALUES
    (2, 2, 200),
    (3, 3, 300);
MERGE INTO debezium_test.orders AS target
USING debezium_test.tmp_orders AS source
ON (target.customer_id = source.customer_id)
WHEN MATCHED THEN
  UPDATE SET
    amount = source.amount,
    customer_id = source.customer_id,
    created_at = CURRENT_TIMESTAMP
WHEN NOT MATCHED THEN
  INSERT (order_id, customer_id, amount)
  VALUES (source.order_id, source.customer_id, source.amount);

image.png

Confluent のトピックでは、INSERT イベントと UPDATE イベントが以下のように記録されています。

{
  "before": null,
  "after": {
    "order_id": 3,
    "customer_id": 3,
    "amount": 300,
    "created_at": "2025-01-05T06:58:02.160738Z"
  },
  "source": {
    "version": "2.4.2.Final",
    "connector": "postgresql",
    "name": "debezium",
    "ts_ms": 1736060282161,
    "snapshot": "false",
    "db": "postgres",
    "sequence": "[\"100674592\",\"100674696\"]",
    "schema": "debezium_test",
    "table": "orders",
    "txId": 1190,
    "lsn": 100674696,
    "xmin": null
  },
  "op": "c",
  "ts_ms": 1736060282378,
  "transaction": null
}

image.png

{
  "before": null,
  "after": {
    "order_id": 2,
    "customer_id": 2,
    "amount": 200,
    "created_at": "2025-01-05T06:58:02.160738Z"
  },
  "source": {
    "version": "2.4.2.Final",
    "connector": "postgresql",
    "name": "debezium",
    "ts_ms": 1736060282161,
    "snapshot": "false",
    "db": "postgres",
    "sequence": "[\"100674592\",\"100674592\"]",
    "schema": "debezium_test",
    "table": "orders",
    "txId": 1190,
    "lsn": 100674592,
    "xmin": null
  },
  "op": "u",
  "ts_ms": 1736060282378,
  "transaction": null
}

image.png

3. Primary Key を設定していないテーブルでの動作検証

事前準備

Primary Key を持たないテーブルを作成します。

-- PK なしテーブル
CREATE TABLE debezium_test.logs (
    log_message TEXT,
    created_at  TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP
);

PK が設定されていないテーブルに対しては、UPDATE を実行しようとすると以下のエラーが発生する場合があります。

cannot update table "logs" because it does not have a replica identity and publishes updates

image.png

このエラーを回避するには、REPLICA IDENTITYFULL に設定します。ただし、INSERT 操作のみであれば必須ではありません。

ALTER TABLE debezium_test.logs REPLICA IDENTITY FULL;

image.png

INSERT 操作(create events)

INSERT INTO debezium_test.logs (log_message)
VALUES
   ('System boot');

Confluent のトピックを確認すると、INSERT イベントが以下のように記録されています。

{
  "before": null,
  "after": {
    "log_message": "System boot",
    "created_at": "2025-01-05T07:00:54.979111Z"
  },
  "source": {
    "version": "2.4.2.Final",
    "connector": "postgresql",
    "name": "debezium",
    "ts_ms": 1736060454979,
    "snapshot": "false",
    "db": "postgres",
    "sequence": "[\"100674896\",\"117448792\"]",
    "schema": "debezium_test",
    "table": "logs",
    "txId": 1232,
    "lsn": 117448792,
    "xmin": null
  },
  "op": "c",
  "ts_ms": 1736060455160,
  "transaction": null
}

image.png

UPDATE 操作(update events)

Primary Key が無いテーブルでは、以下のようにレコードを特定して UPDATE します。

UPDATE debezium_test.logs
   SET log_message = '[UPDATED] System boot'
 WHERE log_message = 'System boot';

image.png

Confluent のトピックでは、UPDATE イベントが以下のように記録されています。

{
  "before": {
    "log_message": "System boot",
    "created_at": "2025-01-05T07:00:54.979111Z"
  },
  "after": {
    "log_message": "[UPDATED] System boot",
    "created_at": "2025-01-05T07:00:54.979111Z"
  },
  "source": {
    "version": "2.4.2.Final",
    "connector": "postgresql",
    "name": "debezium",
    "ts_ms": 1736060585737,
    "snapshot": "false",
    "db": "postgres",
    "sequence": "[\"117448920\",\"117456544\"]",
    "schema": "debezium_test",
    "table": "logs",
    "txId": 1263,
    "lsn": 117456544,
    "xmin": null
  },
  "op": "u",
  "ts_ms": 1736060585803,
  "transaction": null
}

image.png

DELETE 操作(delete events)

最後に、該当レコードを削除します。

DELETE FROM debezium_test.logs
 WHERE log_message = '[UPDATED] System boot';

image.png

Confluent のトピックでは、DELETE イベントが以下のように記録されています。

{
  "before": {
    "log_message": "[UPDATED] System boot",
    "created_at": "2025-01-05T07:00:54.979111Z"
  },
  "after": null,
  "source": {
    "version": "2.4.2.Final",
    "connector": "postgresql",
    "name": "debezium",
    "ts_ms": 1736060654552,
    "snapshot": "false",
    "db": "postgres",
    "sequence": "[\"117456720\",\"117460176\"]",
    "schema": "debezium_test",
    "table": "logs",
    "txId": 1280,
    "lsn": 117460176,
    "xmin": null
  },
  "op": "d",
  "ts_ms": 1736060654671,
  "transaction": null
}

image.png

まとめ

以上が、Azure Database for PostgreSQL から Confluent(Debezium)を用いた CDC の手順と、各種 DML 操作のイベントがどのようにトピックへ記録されるかの例となります。PK(Primary Key)の有無や、複数テーブルにまたがる DML などのパターンによって、Kafka に取り込まれるイベントの内容が異なる点にご注意ください。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?