0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

はじめに

これまでのLevelでは、ストリーミングテーブル(ST)を使った追記型のデータ処理を学んできました。ログやイベントデータのように、新しいデータが追加されていく場合はこれで十分です。

しかし、マスターデータ(顧客情報、商品情報など)は違います:

  • 顧客が住所を変更した → 更新が必要
  • 顧客が退会した → 削除が必要
  • 変更履歴を残したい → 履歴管理が必要

このような「変更データの取り込み(CDC: Change Data Capture)」を簡単に実現するのがAUTO CDCです。

Lakeflow SDP入門:基礎から実践まで

本記事は、SDPを段階的に学ぶ学習パス「Lakeflow SDP入門:基礎から実践まで」の最終回です。

Level タイトル 所要時間 学ぶ概念
1 SQLだけで始めるLakeflow SDP 30分 MV、パイプライン
2 Lakeflow SDPでデータ品質を守るエクスペクテーション 30分 エクスペクテーション
3 Lakeflow SDPの増分処理とストリーミングテーブル 45分 ST、増分処理
4 Lakeflow SDPのフローを理解する 45分 フロー、append_flow
5 Lakeflow SDPのAUTO CDCでマスターデータ同期(本記事) 60分 AUTO CDC、SCD

この記事で学ぶこと

  • CDCとは何か、なぜ必要か
  • AUTO CDCの基本的な使い方
  • SCD Type 1(上書き)とSCD Type 2(履歴保持)の違い
  • マスターデータの同期パターン

前提条件

  • Level 4までを完了している、またはSTとフローの基本を理解している
  • SQLの基本が書ける

CDCとは

Change Data Capture

CDC(Change Data Capture)は、ソースシステムで発生したデータの変更(INSERT、UPDATE、DELETE)をキャプチャして、ターゲットシステムに反映する技術です。

従来のCDC処理の課題

CDCイベントを正しく処理するのは、実は非常に複雑です:

  1. 順序の保証: イベントが順番通りに届くとは限らない
  2. 重複の処理: 同じイベントが複数回届くことがある
  3. DELETE処理: 削除されたレコードをどう扱うか
  4. 履歴管理: 変更前の値を残すかどうか

従来は、これらを自前で実装する必要がありました:

-- 従来のCDC処理(疑似コード)
MERGE INTO target t
USING (
    SELECT *, ROW_NUMBER() OVER (PARTITION BY id ORDER BY timestamp DESC) as rn
    FROM cdc_events
) s
ON t.id = s.id AND s.rn = 1
WHEN MATCHED AND s.operation = 'DELETE' THEN DELETE
WHEN MATCHED AND s.operation = 'UPDATE' THEN UPDATE SET ...
WHEN NOT MATCHED AND s.operation != 'DELETE' THEN INSERT ...

AUTO CDCならシンプル

AUTO CDCを使えば、上記の複雑な処理がこれだけになります:

CREATE STREAMING TABLE customers;

CREATE FLOW sync_customers AS
AUTO CDC INTO customers
FROM STREAM cdc_source
KEYS (id)
SEQUENCE BY timestamp;
  • 順序: SEQUENCE BYで自動的に正しい順序で処理
  • 重複: 同じキーの古いイベントは自動的にスキップ
  • DELETE: APPLY AS DELETE WHENで条件指定
  • 履歴: STORED AS SCD TYPE 2で自動的に履歴管理

SCD Type 1とType 2

AUTO CDCでは、2種類の履歴管理方式をサポートしています。

SCD Type 1(上書き)

最新の状態のみを保持します。変更があれば上書きされ、過去の値は残りません。

【初期状態】
id=1, name="田中太郎", address="東京"

【住所変更後】
id=1, name="田中太郎", address="大阪"  ← 東京の履歴は消える

用途: 最新の状態だけが必要な場合(現在の顧客情報など)

SCD Type 2(履歴保持)

変更のたびに新しい行を追加し、過去の状態も保持します。

【初期状態】
id=1, name="田中太郎", address="東京", __START_AT=2024-01-01, __END_AT=null

【住所変更後】
id=1, name="田中太郎", address="東京", __START_AT=2024-01-01, __END_AT=2024-06-15  ← 終了
id=1, name="田中太郎", address="大阪", __START_AT=2024-06-15, __END_AT=null        ← 新規

用途: 変更履歴が必要な場合(監査、時点分析など)

比較

項目 Type 1 Type 2
過去の値 保持しない 保持する
レコード数 1レコード/キー 複数レコード/キー
ストレージ 少ない 多い
クエリ シンプル 時点指定が必要
用途 最新状態の参照 履歴分析、監査

ハンズオン: AUTO CDCでマスターデータを同期する

顧客マスターの変更をAUTO CDCで処理してみましょう。

シナリオ

ソースシステムから顧客の変更イベント(INSERT/UPDATE/DELETE)が届きます。これをDatabricksのテーブルに同期します。

Step 1: パイプラインとボリュームを準備

-- ボリュームを作成(SQLエディタで実行)
CREATE VOLUME IF NOT EXISTS workspace.sdp.cdc_volume;

ノートブックでディレクトリを作成:

dbutils.fs.mkdirs("/Volumes/workspace/sdp/cdc_volume/customers/")
print("ディレクトリを作成しました")

Step 2: 初期データ(INSERT)を作成

まず、3人の顧客を登録するCDCイベントを作成します。

# 初期データ: 3人の顧客をINSERT
initial_data = """id,name,email,address,operation,event_time
1,田中太郎,tanaka@example.com,東京都渋谷区,INSERT,2024-01-01T10:00:00
2,鈴木花子,suzuki@example.com,大阪府大阪市,INSERT,2024-01-01T10:01:00
3,佐藤次郎,sato@example.com,愛知県名古屋市,INSERT,2024-01-01T10:02:00
"""

dbutils.fs.put("/Volumes/workspace/sdp/cdc_volume/customers/batch1_insert.csv", initial_data, overwrite=True)
print("初期データを作成しました(3件のINSERT)")

Screenshot 2025-12-22 at 10.28.10.png

Step 3: AUTO CDC(SCD Type 1)でテーブルを作成

パイプラインエディタで:

-- CDCソースを取り込むストリーミングテーブル
CREATE STREAMING TABLE customers_cdc_raw;

CREATE FLOW ingest_cdc AS
INSERT INTO customers_cdc_raw BY NAME
SELECT *
FROM STREAM read_files(
    '/Volumes/workspace/sdp/cdc_volume/customers/',
    format => 'csv',
    header => 'true',
    schema => 'id INT, name STRING, email STRING, address STRING, operation STRING, event_time TIMESTAMP'
);

-- AUTO CDCでマスターテーブルに同期(SCD Type 1)
CREATE STREAMING TABLE customers;

CREATE FLOW sync_customers AS
AUTO CDC INTO customers
FROM STREAM customers_cdc_raw
KEYS (id)
APPLY AS DELETE WHEN operation = 'DELETE'
SEQUENCE BY event_time
COLUMNS * EXCEPT (operation, event_time);

ポイント:

  • KEYS (id): 主キーを指定。同じIDのレコードは上書きされる
  • APPLY AS DELETE WHEN operation = 'DELETE': operation列が'DELETE'の場合は削除
  • SEQUENCE BY event_time: この列で順序を決定(古いイベントは無視)
  • COLUMNS * EXCEPT (...): 出力テーブルから除外する列

パイプラインを実行をクリックします。

Step 4: 結果を確認

実行後、customersテーブルを確認:

id name email address
1 田中太郎 tanaka@example.com 東京都渋谷区
2 鈴木花子 suzuki@example.com 大阪府大阪市
3 佐藤次郎 sato@example.com 愛知県名古屋市

Screenshot 2025-12-22 at 10.30.13.png

Step 5: UPDATE/DELETEイベントを追加

顧客情報の変更と削除をシミュレートします。

# 変更データ: UPDATE 2件、DELETE 1件
change_data = """id,name,email,address,operation,event_time
1,田中太郎,tanaka@example.com,神奈川県横浜市,UPDATE,2024-02-15T14:30:00
3,佐藤次郎,sato@example.com,愛知県名古屋市,DELETE,2024-02-20T09:00:00
2,鈴木花子,suzuki.new@example.com,大阪府大阪市,UPDATE,2024-03-01T11:00:00
"""

dbutils.fs.put("/Volumes/workspace/sdp/cdc_volume/customers/batch2_changes.csv", change_data, overwrite=True)
print("変更データを作成しました(UPDATE 2件、DELETE 1件)")

パイプラインを再実行します。

Step 6: 変更結果を確認

customersテーブルを確認:

id name email address
1 田中太郎 tanaka@example.com 神奈川県横浜市
2 鈴木花子 suzuki.new@example.com 大阪府大阪市

Screenshot 2025-12-22 at 10.31.55.png

  • id=1: 住所が「東京都渋谷区」→「神奈川県横浜市」に更新
  • id=2: メールが「suzuki@example.com」→「suzuki.new@example.com」に更新
  • id=3: 削除されてテーブルから消えた

これがSCD Type 1の動作です。最新の状態のみが保持されます。

ハンズオン: SCD Type 2で履歴を保持する

次に、変更履歴を残すSCD Type 2を試してみましょう。

Step 1: 履歴テーブルを追加

パイプラインに以下を追加:

-- AUTO CDCで履歴テーブルに同期(SCD Type 2)
CREATE STREAMING TABLE customers_history;

CREATE FLOW sync_customers_history AS
AUTO CDC INTO customers_history
FROM STREAM customers_cdc_raw
KEYS (id)
APPLY AS DELETE WHEN operation = 'DELETE'
SEQUENCE BY event_time
COLUMNS * EXCEPT (operation, event_time)
STORED AS SCD TYPE 2;

追加されたオプション:

  • STORED AS SCD TYPE 2: 履歴を保持するモード

Step 2: 実行して確認

パイプラインを実行をクリックします。

customers_historyテーブルを確認:

id name email address __START_AT __END_AT
1 田中太郎 tanaka@example.com 東京都渋谷区 2024-01-01 10:00:00 2024-02-15 14:30:00
1 田中太郎 tanaka@example.com 神奈川県横浜市 2024-02-15 14:30:00 null
2 鈴木花子 suzuki@example.com 大阪府大阪市 2024-01-01 10:01:00 2024-03-01 11:00:00
2 鈴木花子 suzuki.new@example.com 大阪府大阪市 2024-03-01 11:00:00 null
3 佐藤次郎 sato@example.com 愛知県名古屋市 2024-01-01 10:02:00 2024-02-20 09:00:00

Screenshot 2025-12-22 at 10.33.22.png

自動的に追加される列:

  • __START_AT: このレコードが有効になった時刻
  • __END_AT: このレコードが無効になった時刻(nullは現在有効)

Step 3: 履歴テーブルのクエリ

現在有効なレコードのみ取得:

SELECT * FROM customers_history
WHERE __END_AT IS NULL;
id name email address __START_AT __END_AT
1 田中太郎 tanaka@example.com 神奈川県横浜市 2024-02-15T14:30:00.000+00:00 -
2 鈴木花子 suzuki.new@example.com 大阪府大阪市 2024-03-01T11:00:00.000+00:00 -

特定時点のレコードを取得:

-- 2024年2月1日時点の顧客情報
SELECT * FROM customers_history
WHERE __START_AT <= '2024-02-01'
  AND (__END_AT IS NULL OR __END_AT > '2024-02-01');
id name email address __START_AT __END_AT
1 田中太郎 tanaka@example.com 東京都渋谷区 2024-01-01T10:00:00.000+00:00 2024-02-15T14:30:00.000+00:00
2 鈴木花子 suzuki@example.com 大阪府大阪市 2024-01-01T10:01:00.000+00:00 2024-03-01T11:00:00.000+00:00
3 佐藤次郎 sato@example.com 愛知県名古屋市 2024-01-01T10:02:00.000+00:00 2024-02-20T09:00:00.000+00:00

AUTO CDCの構文詳細

基本構文

CREATE STREAMING TABLE ターゲットテーブル;

CREATE FLOW フロー名 AS
AUTO CDC INTO ターゲットテーブル
FROM STREAM ソーステーブル
KEYS (主キー列, ...)
[APPLY AS DELETE WHEN 削除条件]
SEQUENCE BY 順序列
[COLUMNS 列指定]
[STORED AS SCD TYPE {1 | 2}];

オプション詳細

オプション 必須 説明
KEYS 主キー列(複数可)
SEQUENCE BY 順序を決定する列(タイムスタンプなど)
APPLY AS DELETE WHEN 削除条件(省略時は削除なし)
COLUMNS 出力する列の指定
STORED AS SCD TYPE 1(デフォルト)または2

COLUMNS指定のパターン

-- すべての列
COLUMNS *

-- 特定の列を除外
COLUMNS * EXCEPT (operation, event_time, _rescued_data)

-- 特定の列のみ
COLUMNS id, name, email

実践パターン

パターン1: 複合キー

CREATE FLOW sync_order_items AS
AUTO CDC INTO order_items
FROM STREAM order_items_cdc
KEYS (order_id, item_id)  -- 複合キー
SEQUENCE BY updated_at;

パターン2: ソフトデリート(論理削除)

物理的に削除せず、フラグで管理する場合:

-- 削除フラグを追加(APPLY AS DELETE WHENを使わない)
CREATE FLOW sync_customers AS
AUTO CDC INTO customers
FROM STREAM customers_cdc
KEYS (id)
SEQUENCE BY event_time
COLUMNS id, name, email, address, 
        CASE WHEN operation = 'DELETE' THEN true ELSE false END AS is_deleted;

パターン3: SCD Type 2 + 現在ビュー

履歴テーブルと、現在の状態を見るビューを組み合わせる:

-- 履歴テーブル
CREATE STREAMING TABLE customers_history;

CREATE FLOW sync_history AS
AUTO CDC INTO customers_history
FROM STREAM customers_cdc
KEYS (id)
SEQUENCE BY event_time
STORED AS SCD TYPE 2;

-- 現在の状態を見るビュー
CREATE MATERIALIZED VIEW customers_current AS
SELECT * EXCEPT (__START_AT, __END_AT)
FROM customers_history
WHERE __END_AT IS NULL;

注意点とベストプラクティス

1. SEQUENCE BY列は必ず指定

-- ❌ 間違い: SEQUENCE BYがない
CREATE FLOW sync AS
AUTO CDC INTO target
FROM STREAM source
KEYS (id);

-- ✅ 正しい: SEQUENCE BYで順序を保証
CREATE FLOW sync AS
AUTO CDC INTO target
FROM STREAM source
KEYS (id)
SEQUENCE BY event_time;

順序が保証されないと、古いイベントが新しいイベントを上書きする可能性があります。

2. ソースにoperation列がない場合

すべてUPSERT(INSERT or UPDATE)として処理されます。DELETEは処理されません。

-- operation列がない場合、APPLY AS DELETE WHENは使えない
CREATE FLOW sync AS
AUTO CDC INTO target
FROM STREAM source
KEYS (id)
SEQUENCE BY updated_at;
-- 削除は処理されない(UPSERTのみ)

3. SCD Type 2のストレージ考慮

SCD Type 2は変更のたびにレコードが増えるため、頻繁に更新されるテーブルではストレージが急増する可能性があります。

-- 対策: 古い履歴を定期的にアーカイブ
-- (パイプライン外で実行)
DELETE FROM customers_history
WHERE __END_AT < DATEADD(YEAR, -2, CURRENT_DATE());

4. キー列は変更しない

AUTO CDCはKEYS列でレコードを識別します。ソース側でキー値を変更すると、別レコードとして扱われます。

-- ❌ ソース側でidを変更すると...
id=1, name="田中" → id=100, name="田中"

-- 結果: id=1とid=100の2レコードが存在

5. _rescued_data列

スキーマ外の列があると_rescued_dataに格納されます。通常は除外します。

COLUMNS * EXCEPT (operation, event_time, _rescued_data)

まとめ

今日できるようになったこと

  • CDCの概念とAUTO CDCの使い方を理解した
  • SCD Type 1(上書き)とType 2(履歴保持)を使い分けられる
  • マスターデータの同期パイプラインを構築できる

AUTO CDCの価値

課題 AUTO CDCによる解決
イベント順序の処理 SEQUENCE BYで自動処理
DELETE処理 APPLY AS DELETE WHENで宣言的に指定
履歴管理 SCD TYPE 2で自動的に履歴保持
重複イベント 同じキー・古いシーケンスは自動スキップ

学習パス完了!

これで「Lakeflow SDP入門:基礎から実践まで」の全5レベルが完了しました。

Level 学んだこと
1 MV、パイプラインの基本
2 エクスペクテーションによるデータ品質管理
3 STと増分処理
4 フローによる柔軟なデータ統合
5 AUTO CDCによるマスターデータ同期

これらを組み合わせることで、本格的なデータパイプラインを構築できます。

次のステップ

さらに学びたい方は:

参考リンク

はじめてのDatabricks

はじめてのDatabricks

Databricks無料トライアル

Databricks無料トライアル

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?