はじめに
PostgreSQLの論理レプリケーション(Logical Replication)に関して調査し、検証した内容を記載します。
この記事では、DB全体の論理レプリケーションに加えて、列リスト、行フィルタを用いて、テーブルの特定の行や列をレプリケーションする方法も検証します。
また、PK重複による簡単なコンフリクトを起こして、それを解消することも試してみたいと思います。
環境情報
環境情報は以下の通りです。
15.4から17.2へ論理レプリケーションを設定しています。
Role | DB Version | OS |
---|---|---|
パブリッシャー | PostgreSQL 15.4 | Rocky Linux 8.10 |
サブスクライバー | PostgreSQL 17.2 | Rocky Linux 8.10 |
目次
- PostgreSQLのレプリケーションの種類
- PostgreSQLのレプリケーションの単位
- レプリケーション検証-DB単位
- パブリッシャー設定-DB単位
- サブスクライバー設定-DB単位
- データ同期の確認-DB単位
- レプリケーション検証-列リストと行フィルタ
- パブリッシャー設定-列リストと行フィルタ
- サブスクライバー設定-列リストと行フィルタ
- データ同期の確認-列リストと行フィルタ
- コンフリクト
- おわりに
PostgreSQLのレプリケーションの種類
PostgreSQLにはストリーミングレプリケーション(Streaming Replication)
と、論理レプリケーション(Logical Replication)の2種類があります。
ストリーミングレプリケーションはメジャーバージョンを跨いだレプリケーションができず、複製の単位はDBクラスタ全体となります。
それに対して、論理レプリケーションは、メジャーバージョン間のレプリケーションが可能で、複製の単位も後述の通り柔軟に設定可能です。
このような特性により、DBのアップグレードやDB統合など様々な場面で利用できます。
今回は、DBのアップグレードという観点でPostgreSQL 15からPostgreSQL 17に対して、論理レプリケーションを設定してみたいと思います。
※メジャーバージョン間の論理レプリケーションの互換性は必ずしも保証されているわけではありません。特定のバージョン間の互換性に関しては、公式ドキュメントを参照してください。
PostgreSQLのレプリケーションの単位
論理レプリケーションは比較的柔軟な単位で設定可能です。
DB単位から、スキーマやテーブル単位でレプリケーションができます。
また、レプリケーションするテーブルの列を絞ったり、行をフィルタリングしたりする機能もあります。
- DB単位
- スキーマ単位
- テーブル単位
- テーブル内の列、行単位
パブリッシャーとサブスクライバー
データの複製元をパブリッシャー、データの複製先をサブスクライバーと呼びます。
パブリッシャー側にパブリケーションというオブジェクトを作成し、サブスクライバー側にサブスクリプションという接続先のパブリケーションを指定したオブジェクトを作成します。
レプリケーション検証-DB単位
まずは、DB全体の論理レプリケーションに関して検証を行います。
パブリッシャー設定-DB単位
PostgreSQLの設定ファイルであるpostgresql.confを編集して、
wal_level パラメータを logical に設定します。
wal_levelの設定変更を反映するにはDBを再起動します。
pg_ctl stop -D $PGDATA
vi $PGDATA/postgresql.conf
###
wal_level = logical
###
pg_ctl start -D $PGDATA
=# show wal_level;
wal_level
───────────
logical
(1 row)
テストデータを作成します。
CREATE TABLE t1(id INT PRIMARY KEY, col1 VARCHAR(10));
INSERT INTO t1 VALUES(1, 'A');
INSERT INTO t1 VALUES(2, 'B');
INSERT INTO t1 VALUES(3, 'C');
INSERT INTO t1 VALUES(4, 'D');
INSERT INTO t1 VALUES(5, 'E');
INSERT INTO t1 VALUES(6, 'F');
INSERT INTO t1 VALUES(7, 'G');
INSERT INTO t1 VALUES(8, 'H');
INSERT INTO t1 VALUES(9, 'I');
INSERT INTO t1 VALUES(10, 'J');
パブリケーションを作成します。
CREATE PUBLICATION pub FOR ALL TABLES;
レプリケーション用ユーザーを作成して、権限を付与します。
CREATE ROLE repluser LOGIN REPLICATION PASSWORD 'repluser';
GRANT pg_read_all_data TO repluser;
サブスクライバー設定-DB単位
パブリッシャーと同じ定義でテーブルを作成します。
CREATE TABLE t1(id INT PRIMARY KEY, col1 VARCHAR(10));
サブスクリプションを作成します。
CREATE SUBSCRIPTION sub CONNECTION 'host=<プライマリホスト> port=<ポート> user=repluser password=repluser dbname=postgres' PUBLICATION pub;
CREATE SUBSCRIPTION実行時にパブリッシャーからデータの初期同期が行われます。
サブスクライバ側のテーブルに既にデータが入っていて、PKなどで重複する場合は、以下のようなエラーが発生します。
2025-03-10 16:41:01.578 JST [] [] [] [39453:67ce978d.9a1d] ERROR: 23505: duplicate key value violates unique constraint "t1_pkey"
2025-03-10 16:41:01.578 JST [] [] [] [39453:67ce978d.9a1d] DETAIL: Key (id)=(1) already exists.
データ同期の確認-DB単位
サブスクライバー側でデータ連携結果を確認します。
=# SELECT * FROM t1;
id │ col1
────┼──────
1 │ A
2 │ B
3 │ C
4 │ D
5 │ E
6 │ F
7 │ G
8 │ H
9 │ I
10 │ J
パブリッシャー側でデータを更新して、同期されているか確認してみます。
t1にデータを1行追加します。
-- パブリッシャー側のデータ更新
=# INSERT INTO t1 VALUES(11, 'K');
INSERT 0 1
Time: 2.884 ms
サブスクライバー側でデータを確認します。
データが1行増えたことを確認できました。
-- サブスクライバー側でデータ確認
=# SELECT * FROM t1;
id │ col1
────┼──────
1 │ A
2 │ B
3 │ C
4 │ D
5 │ E
6 │ F
7 │ G
8 │ H
9 │ I
10 │ J
11 │ K
レプリケーション検証-列リストと行フィルタ
次に、テーブル単位で、列リスト、行フィルタを利用した論理レプリケーションを検証します。
パブリッシャー設定-列リストと行フィルタ
テーブルを作成してデータを投入します。
id = 1 は後ほど検証でデータを追加するので空けておきます。
CREATE TABLE t2(id INT PRIMARY KEY, col1 VARCHAR(10), col2 VARCHAR(10));
INSERT INTO t2 VALUES(2, 'B', 'dummy');
INSERT INTO t2 VALUES(3, 'C', 'dummy');
INSERT INTO t2 VALUES(4, 'D', 'dummy');
-- パブリッシャー側のt2テーブル全体の結果
=# SELECT * FROM t2;
id │ col1 │ col2
────┼──────┼───────
2 │ B │ dummy
3 │ C │ dummy
4 │ D │ dummy
パブリケーションを以下の条件で作成します。
列リスト... id, col1列のみをレプリケーションします。(col2を除外)
行フィルタ... id 3 以上をレプリケーションします。(id 2 以下は除外)
CREATE PUBLICATION pub FOR TABLE t2(id ,col1) WHERE (id >= 3);
サブスクライバー設定-列リストと行フィルタ
パブリッシャーと同じ定義でテーブルを作成します。
CREATE TABLE t2(id INT PRIMARY KEY, col1 VARCHAR(10), col2 VARCHAR(10));
サブスクリプションを作成します。
こちらはDB全体の検証で使用したコマンドと同じです。
CREATE SUBSCRIPTION sub CONNECTION 'host=<プライマリホスト> port=<ポート> user=repluser password=repluser dbname=postgres' PUBLICATION pub;
データ同期の確認-列リストと行フィルタ
サブスクライバー側でデータ連携結果を確認します。
col2列の値はや、id 4 以下の行はレプリケーションされていません。
列リスト、行フィルタの設定がうまく効いていることが確認できました。
=# SELECT * FROM t2;
id │ col1 │ col2
────┼──────┼────────
3 │ C │ <NULL>
4 │ D │ <NULL>
更に検証のためパブリッシャー側でデータを更新します。
id = 1 は行フィルタの条件で除外されます。
id = 5 はレプリケーションされる想定です。
INSERT INTO t2 VALUES(1, 'A', 'dummy');
INSERT INTO t2 VALUES(5, 'E', 'dummy');
-- パブリッシャー側のt2テーブル全体の結果
=# SELECT * FROM t2;
id │ col1 │ col2
────┼──────┼───────
2 │ B │ dummy
3 │ C │ dummy
4 │ D │ dummy
1 │ A │ dummy
5 │ E │ dummy
サブスクライバー側で確認します。
想定どおり、id = 1 は除外され、id = 5 のみ増えています。
col2 列は列リストが効いているのでNULLです。
=# SELECT * FROM t2;
id │ col1 │ col2
────┼──────┼────────
3 │ C │ <NULL>
4 │ D │ <NULL>
5 │ E │ <NULL>
コンフリクト
この状態で、データを更新して意図的にコンフリクトを起こしてみます。
サブスクライバー側で id = 6 をインサートします。
データはエラーなく挿入されます。
=# INSERT INTO t2 VALUES(6, 'F', 'dummy');
INSERT 0 1
Time: 14.372 ms
=# SELECT * FROM t2;
id │ col1 │ col2
────┼──────┼────────
3 │ C │ <NULL>
4 │ D │ <NULL>
5 │ E │ <NULL>
6 │ F │ dummy
パブリッシャー側でも同じデータを挿入してみます。
=# INSERT INTO t2 VALUES(6, 'F', 'dummy');
INSERT 0 1
Time: 1.958 ms
サブスクリプション側のDBログを見てみます。
以下の通り、id = 6 の適用で、PK制約によってエラーとなり、バックグラウンドプロセスの"logical replication apply worker"が異常終了していることが確認できます。
コンフリクトが発生した場合、サブスクリプション側の論理レプリケーションワーカーは再起動して、適用エラーを繰り返します。
従って、論理レプリケーションを組んだ場合は、アプリケーションの不具合やオペレーションミスなどの事故でサブスクリプション側が意図せず更新されてしまうことを監視した方が良いかもしれません。
2025-03-11 18:31:41.510 JST [] [] [] [66132:67d002fd.10254] ERROR: 23505: duplicate key value violates unique constraint "t2_pkey"
2025-03-11 18:31:41.510 JST [] [] [] [66132:67d002fd.10254] DETAIL: Key (id)=(6) already exists.
2025-03-11 18:31:41.510 JST [] [] [] [66132:67d002fd.10254] CONTEXT: processing remote data for replication origin "pg_16583" during message type "INSERT" for replication target relation "public.t2" in transaction 102954, finished at 0/1A4A11A8
2025-03-11 18:31:41.510 JST [] [] [] [66132:67d002fd.10254] LOCATION: _bt_check_unique, nbtinsert.c:673
2025-03-11 18:31:41.513 JST [] [] [] [19750:67ce89ba.4d26] LOG: 00000: background worker "logical replication apply worker" (PID 66132) exited with exit code 1
2025-03-11 18:31:41.513 JST [] [] [] [19750:67ce89ba.4d26] LOCATION: LogChildExit, postmaster.c:3083
この様子は pg_stat_subscription_stats テーブルでも確認できます。
apply_error_count が上昇を続けます。
ログ監視だけでなく、このテーブルのエラーカウントを監視することでもコンフリクトが検知できます。
=# select * from pg_stat_subscription_stats;
subid │ subname │ apply_error_count │ sync_error_count │ stats_reset
───────┼─────────┼───────────────────┼──────────────────┼─────────────
16583 │ sub │ 172 │ 0 │ <NULL>
更にパブリッシャー側で id = 7 を追加しました。
=# INSERT INTO t2 VALUES(7, 'G', 'dummy');
INSERT 0 1
Time: 1.987 ms
サブスクライバー側で確認しても id = 7 はレプリケーションされていません。
前述の通り、論理レプリケーションワーカーがエラーループに陥っているためです。
=# SELECT * FROM t2;
id │ col1 │ col2
────┼──────┼────────
3 │ C │ <NULL>
4 │ D │ <NULL>
5 │ E │ <NULL>
6 │ F │ dummy
データの整合性を取ることでコンフリクトを解消してみます。
id = 6 を削除します。
コンフリクトが解消し、id = 6, 7 が複製されたことが確認できます。
=# DELETE FROM t2 WHERE id = 6;
DELETE 1
-- サブスクライバー側でデータを確認
=# SELECT * FROM t2;
id │ col1 │ col2
────┼──────┼────────
3 │ C │ <NULL>
4 │ D │ <NULL>
5 │ E │ <NULL>
6 │ F │ <NULL>
7 │ G │ <NULL>
おわりに
PostgreSQL 15から17に論理レプリケーションでデータが複製されたことを確認できました。
アップグレードなどで論理レプリケーションを使用する場面は多く発生すると思われますので、引き続き、論理レプリケーションに関して調査をしたいと思います。