0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

1. はじめに

ここ最近、ClickHouse の Managed Postgres(公開ベータの「Postgres managed by ClickHouse」)と ClickPipes CDC(Postgres の変更をニアリアルタイムで ClickHouse へ複製する仕組み)を、遅延・HTAP・dedup といった角度で単一ソース構成で検証してきました。今回はその延長で、複数の Postgres を 1 つの ClickHouse に集約(fan-in)する構成を試します。

「別々に動いている 2 つの業務 DB を、1 つの DWH(データウェアハウス)などにまとめて横断分析したい」というのはよくある話です。今回はソースを 2 台の Postgres に絞り、2 つのソースをきちんと集約して横断クエリを打てる形まで持っていきます。

1.1. 今回の検証ゴール

# ゴール OK条件
G1 2 台の Postgres を 1 つの ClickHouse に集約できる 2 本の ClickPipe で 2 ソースが ClickHouse 側に取り込まれ、データがそろう
G2 集約した「統合テーブル」を正しく作れる ソース識別子をキーに入れて両ソースが共存し、片方の CDC UPDATE がもう片方に波及せず反映される

1.2. 結論(先出し)

  • 複数の ClickPipe を 1 つの ClickHouse サービスに集約するのは公式にサポートされています。各ソースはそれぞれ専用の database / テーブルに取り込まれます
  • 2 ソースを 1 つの統合テーブルにまとめるときは、「どのソースから来たか」を表す識別子をキーに含める必要があります。含めないと、ReplacingMergeTree の重複排除でソースの片方が残りません
  • このキー設計は DWH で多ソースを統合するときの定石(Kimball のサロゲートキー / Data Vault の Record Source + Hash Key1 と同じ考え方です。ソース識別子を含めたキーなら、CDC の UPDATE も同じ注文の行へ反映されます
  • 横断クエリは UNION でも書けますが、クエリのたびに両テーブルを読んで重複排除(FINAL)する分のコストがかかります。頻繁・大規模に回すなら物理統合テーブルの方が向きます

2. 検証環境

項目
ソース DB ClickHouse Managed Postgres ×2(PostgreSQL 18.4、Tokyo / ap-northeast-1)
ソース A pg_jp
ソース B pg_us
ターゲット ClickHouse Cloud(最小構成)
CDC ClickPipes Postgres CDC ×2(Sync interval = 10 秒)

取り込み先の ClickHouse database をソースごとに分け(jp / us)、その database 名をソース識別子として使います。理由は 3 章・4 章で説明します。

3. 集約の仕組み

3.1. ClickPipes CDC と ReplacingMergeTree

ClickPipes の Postgres CDC は、ソースの変更を Sync interval ごとにポーリングして ClickHouse へ流し込みます(既定 60 秒・推奨下限 10 秒。Controlling the syncing of a Postgres ClickPipe)。宛先テーブルは ReplacingMergeTree 系のエンジンで自動生成されます。

ReplacingMergeTree は、同じキー(ORDER BY で指定した列)の行を、最も新しいバージョンの 1 行にまとめてくれるエンジンです。CDC ではこれを次のように使います。

  • INSERT / UPDATE は、_peerdb_version(バージョンを表す値)が異なる新しい行として追加されます(その場で上書きはしない)
  • DELETE は、_peerdb_is_deleted 列でマークした行として追加されます
  • ClickHouse は裏で複数のデータのかたまりを定期的に統合(マージ)し、そのときに**同じキーの行を、_peerdb_version が最大の 1 行に重複排除(dedup)**します
  • マージは非同期なので、クエリ時に最新の 1 行だけを読みたいときは FINAL を付けます(マージ完了を待たずに重複排除済みの結果が得られます)

実際に pipe が作った宛先テーブルの定義は次のとおりでした。

CREATE TABLE jp.orders
(
    `order_id` Int64,
    `customer_id` Int64,
    `amount` Decimal(12, 2),
    `region` String,
    `updated_at` DateTime64(6),
    `_peerdb_synced_at` DateTime64(9) DEFAULT now64(),
    `_peerdb_is_deleted` UInt8,
    `_peerdb_version` UInt64
)
ENGINE = SharedReplacingMergeTree('/clickhouse/tables/{uuid}/{shard}', '{replica}', _peerdb_version)
PRIMARY KEY order_id
ORDER BY order_id

ここで重要なのは、ORDER BY order_id が重複排除のキーを兼ねている点です(ReplacingMergeTree 公式 Docs)。この「dedup キー = ORDER BY」という性質が、今回のポイントになります。

3.2. 複数ソースをどう取り込むか

ClickPipes の FAQ には、複数ソースの集約について次の記述があります(ClickPipes for Postgres FAQ)。

You can create separate ClickPipes from different Postgres instances (including across AWS regions) into one or more ClickHouse services.

つまり、異なる Postgres インスタンスから別々の ClickPipe を作り、1 つの ClickHouse サービスに集約できます。一方で「1 つの宛先テーブルを 2 つの pipe で共有する」という使い方は想定されておらず、各 pipe は自分の宛先テーブルを作成・管理します。したがって構成はこうなります。

2 ソースはそれぞれ専用のテーブル(jp.orders / us.orders)に取り込まれ、横断分析のための「統合テーブル」は ClickHouse 側で別に作ることになります。この統合テーブルをどう作るかが、4 章・5 章の本題です。

4. 検証データと取り込み

4.1. わざと PK を重複させる

両ソースに同じスキーマを置き、各ソースに 10 万件ずつorder_id両方とも 1〜100000 で採番した注文データを入れました。独立して動く 2 つの業務 DB はそれぞれ 1 から採番するため、統合したときに order_id が被ります。今回はそれを意図的に作ります。

-- 両ソース(pg_jp / pg_us)に同一適用
CREATE SCHEMA app;
CREATE TABLE app.orders (
    order_id    bigint PRIMARY KEY,
    customer_id bigint,
    amount      numeric(12,2),
    region      text,
    updated_at  timestamptz DEFAULT clock_timestamp()
);
-- 各ソースに同じ order_id 1..100000 を投入(pg_jp は region='JP'、pg_us は region='US')
INSERT INTO app.orders (order_id, customer_id, amount, region)
SELECT g, ((g - 1) % 1000) + 1, round((random() * 1000)::numeric, 2), 'JP'
FROM generate_series(1, 100000) AS g;

両ソースとも 100,000 行、order_id は 1〜100000 でレンジが完全に重なります。なお region は注文データの一属性であって、ソースを区別する識別子ではありません(後述のキー設計では、取り込み元の database をソース識別子として使います)。

4.2. 取り込み先はソースごとに database を分ける

ClickPipe は、宛先テーブル名にソースのスキーマ名をプレフィックスとして付けるかを選べます(このトグルは既定ではオン。app.ordersapp_orders)。ただし複数ソースを同じ database に同名で取り込もうとすると衝突します(実際、宛先を同じにしたら table cdc_probe exists and is not empty で弾かれました)。

ClickPipe の宛先テーブル設定(スキーマ名プレフィックスの切り替えと宛先テーブル名)

そこで今回は、取り込み先 database をソースごとに分けjp / us)、database 名そのものをソース識別子として使いました。テーブル名のリネームに頼らず衝突を避けられて、構成もすっきりします。

これで jp.orders(10 万行)と us.orders(10 万行)が、ソースごとに分かれて取り込まれました。両 pipe とも Running で、Sync は 10 秒前後で 1 バッチを処理しています。

pipe_jp(Running) pipe_us(Running)
pipe_jp Metrics pipe_us Metrics

5. 統合テーブルをどう作るか

5.1. 取り込みの確認

テーブル 行数 region
jp.orders 100,000 JP
us.orders 100,000 US

ここまでで G1(2 ソースを ClickHouse に集約する)は達成です。2 ソースは別 database の別テーブルに、それぞれ 10 万行そのまま入っています。問題が出るのは、この 2 つを 1 つの統合テーブルにまとめる段です。

5.2. PK だけをキーに統合すると、片方のソースが残らない

横断分析のために「全体の orders を 1 テーブルにまとめたい」と考えるのは自然です。そして pipe が作るテーブルが ORDER BY order_id なので、統合テーブルも同じノリで order_id をキーに作りがちです。

-- 素朴な統合: dedup キー = order_id 単独
CREATE TABLE analytics.orders_naive
(
    order_id Int64, customer_id Int64, amount Decimal(12,2), region String,
    updated_at DateTime64(6), _peerdb_version UInt64
)
ENGINE = ReplacingMergeTree(_peerdb_version)
ORDER BY order_id;

INSERT INTO analytics.orders_naive SELECT order_id, customer_id, amount, region, updated_at, _peerdb_version FROM jp.orders;
INSERT INTO analytics.orders_naive SELECT order_id, customer_id, amount, region, updated_at, _peerdb_version FROM us.orders;

結果はこうなりました。

クエリ 行数
SELECT count() FROM analytics.orders_naive 200,000
SELECT count() FROM analytics.orders_naive FINAL 100,000

挿入直後は 200,000 行ありますが、重複排除を効かせた FINAL では 100,000 行に半減します。しかも region の内訳を見ると、

region 行数(FINAL)
US 100,000
JP 0

JP 側が 1 行も残りませんでした。 order_id を共有する JP と US の行が「同じ行の新しいバージョン」と判定され、後から入れた US が JP を上書きした形です。バージョン(_peerdb_version)が両ソースとも 0(初回スナップショット)で並んだため、後勝ちになっています。

「2 つのソースを 1 つにまとめたら、片方のデータが残らなかった」という、複数ソース集約で踏みやすいつまずきです。

5.3. ソース識別子をキーに入れて、両方を残す

この対応案は「統合テーブルに『どのソースから来たか』を表す列を持たせ、それをキーの先頭に入れる」というものです。ここでは取り込み元の database 名(pg_jp / pg_us)を record_source 列として持たせます。さらに、record_sourceorder_id から hk(ハッシュキー。2 つの値から一意な値を作るハッシュ) を作り、これを ORDER BY(= dedup キー)にします。

CREATE TABLE analytics.orders_dv
(
    order_id Int64, customer_id Int64, amount Decimal(12,2), region String,
    updated_at DateTime64(6),
    record_source LowCardinality(String),                 -- どのソース由来か
    hk UInt64,                                            -- record_source + order_id のハッシュ
    _peerdb_version UInt64
)
ENGINE = ReplacingMergeTree(_peerdb_version)
ORDER BY hk;

INSERT INTO analytics.orders_dv
SELECT order_id, customer_id, amount, region, updated_at,
       'pg_jp', cityHash64('pg_jp', order_id), _peerdb_version
FROM jp.orders;
INSERT INTO analytics.orders_dv
SELECT order_id, customer_id, amount, region, updated_at,
       'pg_us', cityHash64('pg_us', order_id), _peerdb_version
FROM us.orders;

ORDER BY (record_source, order_id) という複合キーでも同じことができますが、ここでは後述の Data Vault の Hash Key に寄せてハッシュ 1 列にしました。結果は次のとおりです。

確認項目 結果
行数(FINAL) 200,000
hk の異なり数(FINAL) 200,000(衝突なし)
pg_jp / pg_us の内訳 各 100,000

order_id が被っていても、ソースが違えば hk が変わるので、両ソースの 10 万行がそのまま共存します。

5.4. CDC の UPDATE がこのキーで正しく扱えるか

統合テーブルが「集約した瞬間だけ」正しくても意味がありません。ソース側で UPDATE が起きたとき、統合テーブルの該当行が正しく更新されるかを確認します。

まず pg_jp の order_id=1 を更新します。

-- pg_jp 側で実行
UPDATE app.orders SET amount = 999999.99, updated_at = clock_timestamp() WHERE order_id = 1;

この更新は、約 10 秒後に jp.ordersより大きい _peerdb_version の新しい行として届きました。

order_id amount _peerdb_version
1 941.22 0
1 999999.99 1781084051091454551

FINAL は最新(999999.99)を返します。バージョンが 0 から LSN ベースの大きな値に増えており、UPDATE が新しいバージョンとして届いていることが分かります。

この更新を orders_dv に取り込んだうえで order_id=1 を見ると、

record_source order_id amount _peerdb_version
pg_jp 1 999999.99 1781084051091454551
pg_us 1 854.31 0
  • 別ソースの同じ order_id=1両方とも残るhk が別なので重複排除されない)
  • pg_jp の UPDATE は pg_jp の order_id=1 の行だけに反映される(999999.99)
  • pg_us の order_id=1まったく影響を受けない(854.31 のまま)

ソースが違えば衝突せず、CDC の UPDATE も同じ注文の行へ正しく反映される、という形になりました。これで G2 も達成です。

5.5. 統合テーブルを作らず、UNION でまとめる場合

物理的な統合テーブルを作らず、jp.ordersus.orders をクエリ時に UNION でまとめる手もあります。

CREATE VIEW analytics.orders_union AS
SELECT order_id, customer_id, amount, region, updated_at FROM jp.orders FINAL
UNION ALL
SELECT order_id, customer_id, amount, region, updated_at FROM us.orders FINAL;

別テーブルのままなので order_id の衝突は起きず、SELECT count() FROM analytics.orders_union は 200,000 を返します。UNION ALL は 2 つのテーブルをそれぞれ読んで連結するだけですが、重複排除のために各テーブルへ付けた FINAL の分、クエリのたびに両テーブルを読んで dedup するコストがかかります。公式 Docs にも、FINAL は「通常はマージ時に行われる処理をクエリ時にメモリ上で行うため、追加の計算リソースとメモリを要する」とあります(FROM 句の FINAL 修飾子 | ClickHouse Docs)。さらに、UNION を含むクエリは Materialized View にできないという制約もあります(ClickHouse #17058)。

そのため UNION は「たまに横断クエリを打つ」用途には手軽ですが、横断分析を頻繁・大規模に回すなら、5.3 章のようにソース識別子を含むキーで物理統合テーブルを持つ方が向きます。

6. 考察

6.1. PK の重複は「複数ソースの統合キー設計」の問題

5.2 章の現象そのものは「ReplacingMergeTree の dedup キーは ORDER BY」という仕様の素直な帰結です。独立した 2 つの Postgres は別々に 1 から採番するので、order_id という値の空間が被り、order_id だけをキーにした統合テーブルでは片方が残りません。

これは ClickHouse 固有の話というより、複数のソースシステムを 1 つの統合層にまとめるときのキー設計という、DWH では古くからあるテーマです。一例をあげると次のような対応があります。

流派 キー設計 今回との対応
Kimball(ディメンショナルモデル) ソースの自然キーをそのまま使わず、DWH側でサロゲートキーを採番して一意化する(Surrogate Keys / Kimball Group ソース識別子+order_id から一意なキーを作る
Data Vault 2.0 全行に Record Source(どのソース由来か)+ Load Date + Hash Key(業務キー由来) を持たせる(Data Vault on Amazon Redshift / AWS record_source が Record Source、cityHash64(record_source, order_id) が Hash Key

共通する原則は「ソースの自然キー(連番 PK)を、全ソースで一意なキーとして使わない。サロゲート、または『業務キー+ソース識別子』で一意化する」です。5.3 章で record_source を足したのは、この定石を ClickHouse の ORDER BY に落としたものでした。

6.2. ClickHouse でキーをどう選ぶか

DWH 一般論に加え、ClickHouse の ReplacingMergeTree を利用していることで、もう 1 つ条件が追加になります。dedup キー(= ORDER BY)に使う列は、その注文を通して変わらない値でなければなりません。途中で変わる列をキーに入れると、同じ注文が別物として扱われ、いつまでも 1 行にまとまりません。

このため、行を入れるたびに採番するような「中身と無関係なサロゲートキー」は、ClickHouse × CDC の dedup キーには向きません(CDC で UPDATE が来ても、同じ注文に紐づかず更新が同じ行にまとまらないため)。今回 cityHash64(record_source, order_id) が機能したのは、ソースと order_id という「その注文を通して変わらない値」から決まるキーだったからです。(record_source, order_id) の複合キーでも同じ性質が得られます。

なお ClickPipes が自動で付ける _peerdb_synced_at は Data Vault の Load Date に相当します。「複数ソースを統合する」という同じ問題に対して、独立に作られた仕組みが似た形で実装されているのは面白いところです。

7. まとめ

ポイント 内容
集約できるか 複数 ClickPipe → 1 ClickHouse は公式サポート。各ソースは専用の database / テーブルに取り込まれる(実測で各 10 万行)
宛先テーブル 1 つの宛先テーブルを 2 pipe で共有はできない。統合テーブルは ClickHouse 側で別に作る
PK だけの統合 order_id だけを dedup キーにすると、ソースの片方が残らない(実測で JP が 0 行)
直し方 「どのソース由来か」を表す識別子(record_source)をキーに含める。record_sourceorder_id、またはそのハッシュ(hk
CDC の UPDATE ソース識別子を含むキーなら、同じ注文の行へ正しく反映され、別ソースの同じ order_id には波及しない
UNION でまとめる クエリ時にまとめるのは手軽だが、FINAL のコストが毎回かかり MV にもできない。頻繁・大規模なら物理統合テーブル向き

複数の Postgres を 1 つの ClickHouse に集約するのは、公式機能の組み合わせで素直にできました。引っかかるのは「集約した統合テーブルのキーをどう設計するか」だけで、ここを DWH の多ソース統合と同じく「ソース識別子を含むキー」にすれば、両ソースが共存し、CDC の更新も正しく反映できます。

参考

  1. KimballData Vault はいずれも DWH(データウェアハウス)の設計方法論。サロゲートキー=ソースの自然キー(連番 PK 等)の代わりに倉庫側で振る人工キー。Record Source=その行がどのソース由来かを示す列。Hash Key=業務キー(+ソース)から作るハッシュ値で、結合や一意化に使う。詳しくは 6 章。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?