1. はじめに
ここ最近、ClickHouse の Managed Postgres(公開ベータの「Postgres managed by ClickHouse」)と ClickPipes CDC(Postgres の変更をニアリアルタイムで ClickHouse へ複製する仕組み)を、遅延・HTAP・dedup といった角度で単一ソース構成で検証してきました。今回はその延長で、複数の Postgres を 1 つの ClickHouse に集約(fan-in)する構成を試します。
「別々に動いている 2 つの業務 DB を、1 つの DWH(データウェアハウス)などにまとめて横断分析したい」というのはよくある話です。今回はソースを 2 台の Postgres に絞り、2 つのソースをきちんと集約して横断クエリを打てる形まで持っていきます。
1.1. 今回の検証ゴール
| # | ゴール | OK条件 |
|---|---|---|
| G1 | 2 台の Postgres を 1 つの ClickHouse に集約できる | 2 本の ClickPipe で 2 ソースが ClickHouse 側に取り込まれ、データがそろう |
| G2 | 集約した「統合テーブル」を正しく作れる | ソース識別子をキーに入れて両ソースが共存し、片方の CDC UPDATE がもう片方に波及せず反映される |
1.2. 結論(先出し)
- 複数の ClickPipe を 1 つの ClickHouse サービスに集約するのは公式にサポートされています。各ソースはそれぞれ専用の database / テーブルに取り込まれます
- 2 ソースを 1 つの統合テーブルにまとめるときは、「どのソースから来たか」を表す識別子をキーに含める必要があります。含めないと、ReplacingMergeTree の重複排除でソースの片方が残りません
- このキー設計は DWH で多ソースを統合するときの定石(Kimball のサロゲートキー / Data Vault の Record Source + Hash Key1) と同じ考え方です。ソース識別子を含めたキーなら、CDC の UPDATE も同じ注文の行へ反映されます
- 横断クエリは UNION でも書けますが、クエリのたびに両テーブルを読んで重複排除(
FINAL)する分のコストがかかります。頻繁・大規模に回すなら物理統合テーブルの方が向きます
2. 検証環境
| 項目 | 値 |
|---|---|
| ソース DB | ClickHouse Managed Postgres ×2(PostgreSQL 18.4、Tokyo / ap-northeast-1) |
| ソース A | pg_jp |
| ソース B | pg_us |
| ターゲット | ClickHouse Cloud(最小構成) |
| CDC | ClickPipes Postgres CDC ×2(Sync interval = 10 秒) |
取り込み先の ClickHouse database をソースごとに分け(jp / us)、その database 名をソース識別子として使います。理由は 3 章・4 章で説明します。
3. 集約の仕組み
3.1. ClickPipes CDC と ReplacingMergeTree
ClickPipes の Postgres CDC は、ソースの変更を Sync interval ごとにポーリングして ClickHouse へ流し込みます(既定 60 秒・推奨下限 10 秒。Controlling the syncing of a Postgres ClickPipe)。宛先テーブルは ReplacingMergeTree 系のエンジンで自動生成されます。
ReplacingMergeTree は、同じキー(ORDER BY で指定した列)の行を、最も新しいバージョンの 1 行にまとめてくれるエンジンです。CDC ではこれを次のように使います。
- INSERT / UPDATE は、
_peerdb_version(バージョンを表す値)が異なる新しい行として追加されます(その場で上書きはしない) - DELETE は、
_peerdb_is_deleted列でマークした行として追加されます - ClickHouse は裏で複数のデータのかたまりを定期的に統合(マージ)し、そのときに**同じキーの行を、
_peerdb_versionが最大の 1 行に重複排除(dedup)**します - マージは非同期なので、クエリ時に最新の 1 行だけを読みたいときは
FINALを付けます(マージ完了を待たずに重複排除済みの結果が得られます)
実際に pipe が作った宛先テーブルの定義は次のとおりでした。
CREATE TABLE jp.orders
(
`order_id` Int64,
`customer_id` Int64,
`amount` Decimal(12, 2),
`region` String,
`updated_at` DateTime64(6),
`_peerdb_synced_at` DateTime64(9) DEFAULT now64(),
`_peerdb_is_deleted` UInt8,
`_peerdb_version` UInt64
)
ENGINE = SharedReplacingMergeTree('/clickhouse/tables/{uuid}/{shard}', '{replica}', _peerdb_version)
PRIMARY KEY order_id
ORDER BY order_id
ここで重要なのは、ORDER BY order_id が重複排除のキーを兼ねている点です(ReplacingMergeTree 公式 Docs)。この「dedup キー = ORDER BY」という性質が、今回のポイントになります。
3.2. 複数ソースをどう取り込むか
ClickPipes の FAQ には、複数ソースの集約について次の記述があります(ClickPipes for Postgres FAQ)。
You can create separate ClickPipes from different Postgres instances (including across AWS regions) into one or more ClickHouse services.
つまり、異なる Postgres インスタンスから別々の ClickPipe を作り、1 つの ClickHouse サービスに集約できます。一方で「1 つの宛先テーブルを 2 つの pipe で共有する」という使い方は想定されておらず、各 pipe は自分の宛先テーブルを作成・管理します。したがって構成はこうなります。
2 ソースはそれぞれ専用のテーブル(jp.orders / us.orders)に取り込まれ、横断分析のための「統合テーブル」は ClickHouse 側で別に作ることになります。この統合テーブルをどう作るかが、4 章・5 章の本題です。
4. 検証データと取り込み
4.1. わざと PK を重複させる
両ソースに同じスキーマを置き、各ソースに 10 万件ずつ、order_id を 両方とも 1〜100000 で採番した注文データを入れました。独立して動く 2 つの業務 DB はそれぞれ 1 から採番するため、統合したときに order_id が被ります。今回はそれを意図的に作ります。
-- 両ソース(pg_jp / pg_us)に同一適用
CREATE SCHEMA app;
CREATE TABLE app.orders (
order_id bigint PRIMARY KEY,
customer_id bigint,
amount numeric(12,2),
region text,
updated_at timestamptz DEFAULT clock_timestamp()
);
-- 各ソースに同じ order_id 1..100000 を投入(pg_jp は region='JP'、pg_us は region='US')
INSERT INTO app.orders (order_id, customer_id, amount, region)
SELECT g, ((g - 1) % 1000) + 1, round((random() * 1000)::numeric, 2), 'JP'
FROM generate_series(1, 100000) AS g;
両ソースとも 100,000 行、order_id は 1〜100000 でレンジが完全に重なります。なお region は注文データの一属性であって、ソースを区別する識別子ではありません(後述のキー設計では、取り込み元の database をソース識別子として使います)。
4.2. 取り込み先はソースごとに database を分ける
ClickPipe は、宛先テーブル名にソースのスキーマ名をプレフィックスとして付けるかを選べます(このトグルは既定ではオン。app.orders → app_orders)。ただし複数ソースを同じ database に同名で取り込もうとすると衝突します(実際、宛先を同じにしたら table cdc_probe exists and is not empty で弾かれました)。
そこで今回は、取り込み先 database をソースごとに分け(jp / us)、database 名そのものをソース識別子として使いました。テーブル名のリネームに頼らず衝突を避けられて、構成もすっきりします。
これで jp.orders(10 万行)と us.orders(10 万行)が、ソースごとに分かれて取り込まれました。両 pipe とも Running で、Sync は 10 秒前後で 1 バッチを処理しています。
| pipe_jp(Running) | pipe_us(Running) |
|---|---|
![]() |
![]() |
5. 統合テーブルをどう作るか
5.1. 取り込みの確認
| テーブル | 行数 | region |
|---|---|---|
jp.orders |
100,000 | JP |
us.orders |
100,000 | US |
ここまでで G1(2 ソースを ClickHouse に集約する)は達成です。2 ソースは別 database の別テーブルに、それぞれ 10 万行そのまま入っています。問題が出るのは、この 2 つを 1 つの統合テーブルにまとめる段です。
5.2. PK だけをキーに統合すると、片方のソースが残らない
横断分析のために「全体の orders を 1 テーブルにまとめたい」と考えるのは自然です。そして pipe が作るテーブルが ORDER BY order_id なので、統合テーブルも同じノリで order_id をキーに作りがちです。
-- 素朴な統合: dedup キー = order_id 単独
CREATE TABLE analytics.orders_naive
(
order_id Int64, customer_id Int64, amount Decimal(12,2), region String,
updated_at DateTime64(6), _peerdb_version UInt64
)
ENGINE = ReplacingMergeTree(_peerdb_version)
ORDER BY order_id;
INSERT INTO analytics.orders_naive SELECT order_id, customer_id, amount, region, updated_at, _peerdb_version FROM jp.orders;
INSERT INTO analytics.orders_naive SELECT order_id, customer_id, amount, region, updated_at, _peerdb_version FROM us.orders;
結果はこうなりました。
| クエリ | 行数 |
|---|---|
SELECT count() FROM analytics.orders_naive |
200,000 |
SELECT count() FROM analytics.orders_naive FINAL |
100,000 |
挿入直後は 200,000 行ありますが、重複排除を効かせた FINAL では 100,000 行に半減します。しかも region の内訳を見ると、
| region | 行数(FINAL) |
|---|---|
| US | 100,000 |
| JP | 0 |
JP 側が 1 行も残りませんでした。 order_id を共有する JP と US の行が「同じ行の新しいバージョン」と判定され、後から入れた US が JP を上書きした形です。バージョン(_peerdb_version)が両ソースとも 0(初回スナップショット)で並んだため、後勝ちになっています。
「2 つのソースを 1 つにまとめたら、片方のデータが残らなかった」という、複数ソース集約で踏みやすいつまずきです。
5.3. ソース識別子をキーに入れて、両方を残す
この対応案は「統合テーブルに『どのソースから来たか』を表す列を持たせ、それをキーの先頭に入れる」というものです。ここでは取り込み元の database 名(pg_jp / pg_us)を record_source 列として持たせます。さらに、record_source と order_id から hk(ハッシュキー。2 つの値から一意な値を作るハッシュ) を作り、これを ORDER BY(= dedup キー)にします。
CREATE TABLE analytics.orders_dv
(
order_id Int64, customer_id Int64, amount Decimal(12,2), region String,
updated_at DateTime64(6),
record_source LowCardinality(String), -- どのソース由来か
hk UInt64, -- record_source + order_id のハッシュ
_peerdb_version UInt64
)
ENGINE = ReplacingMergeTree(_peerdb_version)
ORDER BY hk;
INSERT INTO analytics.orders_dv
SELECT order_id, customer_id, amount, region, updated_at,
'pg_jp', cityHash64('pg_jp', order_id), _peerdb_version
FROM jp.orders;
INSERT INTO analytics.orders_dv
SELECT order_id, customer_id, amount, region, updated_at,
'pg_us', cityHash64('pg_us', order_id), _peerdb_version
FROM us.orders;
ORDER BY (record_source, order_id) という複合キーでも同じことができますが、ここでは後述の Data Vault の Hash Key に寄せてハッシュ 1 列にしました。結果は次のとおりです。
| 確認項目 | 結果 |
|---|---|
| 行数(FINAL) | 200,000 |
hk の異なり数(FINAL) |
200,000(衝突なし) |
pg_jp / pg_us の内訳 |
各 100,000 |
order_id が被っていても、ソースが違えば hk が変わるので、両ソースの 10 万行がそのまま共存します。
5.4. CDC の UPDATE がこのキーで正しく扱えるか
統合テーブルが「集約した瞬間だけ」正しくても意味がありません。ソース側で UPDATE が起きたとき、統合テーブルの該当行が正しく更新されるかを確認します。
まず pg_jp の order_id=1 を更新します。
-- pg_jp 側で実行
UPDATE app.orders SET amount = 999999.99, updated_at = clock_timestamp() WHERE order_id = 1;
この更新は、約 10 秒後に jp.orders へ より大きい _peerdb_version の新しい行として届きました。
| order_id | amount | _peerdb_version |
|---|---|---|
| 1 | 941.22 | 0 |
| 1 | 999999.99 | 1781084051091454551 |
FINAL は最新(999999.99)を返します。バージョンが 0 から LSN ベースの大きな値に増えており、UPDATE が新しいバージョンとして届いていることが分かります。
この更新を orders_dv に取り込んだうえで order_id=1 を見ると、
| record_source | order_id | amount | _peerdb_version |
|---|---|---|---|
| pg_jp | 1 | 999999.99 | 1781084051091454551 |
| pg_us | 1 | 854.31 | 0 |
- 別ソースの同じ
order_id=1が両方とも残る(hkが別なので重複排除されない) - pg_jp の UPDATE は pg_jp の
order_id=1の行だけに反映される(999999.99) - pg_us の
order_id=1は まったく影響を受けない(854.31 のまま)
ソースが違えば衝突せず、CDC の UPDATE も同じ注文の行へ正しく反映される、という形になりました。これで G2 も達成です。
5.5. 統合テーブルを作らず、UNION でまとめる場合
物理的な統合テーブルを作らず、jp.orders と us.orders をクエリ時に UNION でまとめる手もあります。
CREATE VIEW analytics.orders_union AS
SELECT order_id, customer_id, amount, region, updated_at FROM jp.orders FINAL
UNION ALL
SELECT order_id, customer_id, amount, region, updated_at FROM us.orders FINAL;
別テーブルのままなので order_id の衝突は起きず、SELECT count() FROM analytics.orders_union は 200,000 を返します。UNION ALL は 2 つのテーブルをそれぞれ読んで連結するだけですが、重複排除のために各テーブルへ付けた FINAL の分、クエリのたびに両テーブルを読んで dedup するコストがかかります。公式 Docs にも、FINAL は「通常はマージ時に行われる処理をクエリ時にメモリ上で行うため、追加の計算リソースとメモリを要する」とあります(FROM 句の FINAL 修飾子 | ClickHouse Docs)。さらに、UNION を含むクエリは Materialized View にできないという制約もあります(ClickHouse #17058)。
そのため UNION は「たまに横断クエリを打つ」用途には手軽ですが、横断分析を頻繁・大規模に回すなら、5.3 章のようにソース識別子を含むキーで物理統合テーブルを持つ方が向きます。
6. 考察
6.1. PK の重複は「複数ソースの統合キー設計」の問題
5.2 章の現象そのものは「ReplacingMergeTree の dedup キーは ORDER BY」という仕様の素直な帰結です。独立した 2 つの Postgres は別々に 1 から採番するので、order_id という値の空間が被り、order_id だけをキーにした統合テーブルでは片方が残りません。
これは ClickHouse 固有の話というより、複数のソースシステムを 1 つの統合層にまとめるときのキー設計という、DWH では古くからあるテーマです。一例をあげると次のような対応があります。
| 流派 | キー設計 | 今回との対応 |
|---|---|---|
| Kimball(ディメンショナルモデル) | ソースの自然キーをそのまま使わず、DWH側でサロゲートキーを採番して一意化する(Surrogate Keys / Kimball Group) | ソース識別子+order_id から一意なキーを作る |
| Data Vault 2.0 | 全行に Record Source(どのソース由来か)+ Load Date + Hash Key(業務キー由来) を持たせる(Data Vault on Amazon Redshift / AWS) |
record_source が Record Source、cityHash64(record_source, order_id) が Hash Key |
共通する原則は「ソースの自然キー(連番 PK)を、全ソースで一意なキーとして使わない。サロゲート、または『業務キー+ソース識別子』で一意化する」です。5.3 章で record_source を足したのは、この定石を ClickHouse の ORDER BY に落としたものでした。
6.2. ClickHouse でキーをどう選ぶか
DWH 一般論に加え、ClickHouse の ReplacingMergeTree を利用していることで、もう 1 つ条件が追加になります。dedup キー(= ORDER BY)に使う列は、その注文を通して変わらない値でなければなりません。途中で変わる列をキーに入れると、同じ注文が別物として扱われ、いつまでも 1 行にまとまりません。
このため、行を入れるたびに採番するような「中身と無関係なサロゲートキー」は、ClickHouse × CDC の dedup キーには向きません(CDC で UPDATE が来ても、同じ注文に紐づかず更新が同じ行にまとまらないため)。今回 cityHash64(record_source, order_id) が機能したのは、ソースと order_id という「その注文を通して変わらない値」から決まるキーだったからです。(record_source, order_id) の複合キーでも同じ性質が得られます。
なお ClickPipes が自動で付ける _peerdb_synced_at は Data Vault の Load Date に相当します。「複数ソースを統合する」という同じ問題に対して、独立に作られた仕組みが似た形で実装されているのは面白いところです。
7. まとめ
| ポイント | 内容 |
|---|---|
| 集約できるか | 複数 ClickPipe → 1 ClickHouse は公式サポート。各ソースは専用の database / テーブルに取り込まれる(実測で各 10 万行) |
| 宛先テーブル | 1 つの宛先テーブルを 2 pipe で共有はできない。統合テーブルは ClickHouse 側で別に作る |
| PK だけの統合 |
order_id だけを dedup キーにすると、ソースの片方が残らない(実測で JP が 0 行) |
| 直し方 | 「どのソース由来か」を表す識別子(record_source)をキーに含める。record_source + order_id、またはそのハッシュ(hk) |
| CDC の UPDATE | ソース識別子を含むキーなら、同じ注文の行へ正しく反映され、別ソースの同じ order_id には波及しない |
| UNION でまとめる | クエリ時にまとめるのは手軽だが、FINAL のコストが毎回かかり MV にもできない。頻繁・大規模なら物理統合テーブル向き |
複数の Postgres を 1 つの ClickHouse に集約するのは、公式機能の組み合わせで素直にできました。引っかかるのは「集約した統合テーブルのキーをどう設計するか」だけで、ここを DWH の多ソース統合と同じく「ソース識別子を含むキー」にすれば、両ソースが共存し、CDC の更新も正しく反映できます。
参考
- Postgres managed by ClickHouse is now in beta
- ClickPipes for Postgres FAQ | ClickHouse Docs
- Ingesting data from Postgres to ClickHouse (using CDC) | ClickHouse Docs
- ReplacingMergeTree | ClickHouse Docs
-
Kimball と Data Vault はいずれも DWH(データウェアハウス)の設計方法論。サロゲートキー=ソースの自然キー(連番 PK 等)の代わりに倉庫側で振る人工キー。Record Source=その行がどのソース由来かを示す列。Hash Key=業務キー(+ソース)から作るハッシュ値で、結合や一意化に使う。詳しくは 6 章。 ↩


