1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Lakeflow SDP入門者の鬼門:ストリーミングテーブルとマテリアライズドビューを完全理解する

Posted at

はじめに

Lakeflow Spark宣言型パイプライン(SDP)を学び始めると、最初にぶつかる壁があります。それがストリーミングテーブル(ST)マテリアライズドビュー(MV)の違いです。

「どっちもテーブルを作るんでしょ?何が違うの?」
「ストリーミングって書いてあるけど、バッチ処理でも使えるの?」
「どっちを使えばいいの?」

こうした疑問を持つのは当然です。本記事では、STとMVの実態動作原理使い分け、そしてよくあるつまづきポイントを徹底解説します。

1. まず結論:STとMVの決定的な違い

最初に結論を示します。

項目 ストリーミングテーブル(ST) マテリアライズドビュー(MV)
データの読み方 増分(新しいデータのみ) 全件(毎回フルスキャン)
内部の実態 Delta Table + チェックポイント Delta Table + クエリ定義
更新時の動作 前回の続きから処理 全データを再計算
適したユースケース データ取り込み、CDC、ログ蓄積 集計、結合、変換
ソースの制約 Append-onlyのソース向き 任意のソース

一言でまとめると:

  • ストリーミングテーブル = 増分処理でデータを蓄積するテーブル
  • マテリアライズドビュー = 毎回再計算する実体化ビュー

2. ストリーミングテーブル(ST)の実態と動作原理

2.1 STの実態:Delta Table + チェックポイント

ストリーミングテーブルは、内部的には以下の2つで構成されています。

  1. Delta Table: 実際のデータが格納される場所
  2. チェックポイント: 「前回どこまで処理したか」を記録するメタデータ

2.2 STの動作原理:増分処理

STの最大の特徴は増分処理です。パイプラインを実行するたびに、前回処理した「続き」から処理を開始します。

【1回目の実行】
ソースデータ: [A, B, C, D, E]
処理対象:     [A, B, C, D, E]  ← 全件処理
チェックポイント: "Eまで処理済み"

【2回目の実行】(新しいデータ F, G が追加された場合)
ソースデータ: [A, B, C, D, E, F, G]
処理対象:     [F, G]  ← 新規分のみ処理
チェックポイント: "Gまで処理済み"

これにより、大量のデータがあっても効率的に処理できます。

2.3 STの重要なポイント:STREAMキーワード

STから別のSTにデータを流す場合、必ずSTREAMキーワードを使う必要があります。

-- ✅ 正しい:STREAMキーワードを使用
CREATE FLOW my_flow AS
INSERT INTO downstream_st BY NAME
SELECT * FROM STREAM upstream_st;

-- ❌ 間違い:STREAMキーワードがない
CREATE FLOW my_flow AS
INSERT INTO downstream_st BY NAME
SELECT * FROM upstream_st;  -- これはバッチ読み取りになる

Pythonでも同様です:

# ✅ 正しい:readStreamを使用
@dp.append_flow(target="downstream_st")
def flow():
    return spark.readStream.table("upstream_st")

# ❌ 間違い:readを使用
@dp.append_flow(target="downstream_st")
def flow():
    return spark.read.table("upstream_st")  # バッチ読み取りになる

3. マテリアライズドビュー(MV)の実態と動作原理

3.1 MVの実態:Delta Table + クエリ定義

マテリアライズドビューも内部的にはDelta Tableですが、STとは異なりチェックポイントを持ちません。代わりに、クエリ定義を保持しています。

  • Delta Table: 計算結果が格納される場所
  • クエリ定義: SELECT文などの変換ロジック

3.2 MVの動作原理:フル再計算

MVはパイプライン実行のたびに、ソースデータ全体を再スキャンして結果を計算し直します。

【1回目の実行】
ソースデータ: [A, B, C, D, E]
処理対象:     [A, B, C, D, E]  ← 全件処理
結果テーブル: 集計結果など

【2回目の実行】(新しいデータ F, G が追加された場合)
ソースデータ: [A, B, C, D, E, F, G]
処理対象:     [A, B, C, D, E, F, G]  ← 全件再処理
結果テーブル: 新しい集計結果(全データ反映)

「それって非効率じゃない?」と思うかもしれませんが、集計や結合を含むクエリでは、増分処理が難しいケースが多いのです。

3.3 MVの重要なポイント:STREAMを使わない

MVではSTREAMキーワードを使いません。使うとエラーになります。

-- ✅ 正しい:通常のSELECT
CREATE OR REPLACE MATERIALIZED VIEW my_mv AS
SELECT * FROM source_table;

-- ❌ 間違い:STREAMを使用
CREATE OR REPLACE MATERIALIZED VIEW my_mv AS
SELECT * FROM STREAM source_table;  -- エラーになる

4. フローとは何か:STとMVの定義方法の違い

ここで、SDPの重要な概念であるフロー(Flow)について説明します。

4.1 MVとSTの定義方法の違い

マテリアライズドビュー:定義とデータソースが一体

-- これ1つで「テーブル定義」と「データの取得方法」が決まる
CREATE MATERIALIZED VIEW sales_summary AS
SELECT product_id, SUM(amount) as total
FROM raw_sales
GROUP BY product_id;

ストリーミングテーブル:「箱」と「データの流し込み方」が分離

-- Step 1: 箱だけ作る(中身は空)
CREATE STREAMING TABLE sales_events;

-- Step 2: データの流し込み方を定義(これがフロー)
CREATE FLOW ingest_sales AS
INSERT INTO sales_events BY NAME
SELECT * FROM STREAM read_files('/path/to/data');

フローとは「STにデータを入れる方法」を定義するものです。

4.2 なぜ分離されているのか

分離されていることで、以下が可能になります:

1. 複数のソースから1つのSTに流し込める

CREATE STREAMING TABLE all_logs;

-- ソースA
CREATE FLOW flow_a AS
INSERT INTO all_logs BY NAME
SELECT *, 'system_a' as source FROM STREAM read_files('/logs/a');

-- ソースB  
CREATE FLOW flow_b AS
INSERT INTO all_logs BY NAME
SELECT *, 'system_b' as source FROM STREAM read_files('/logs/b');

2. 流し込み方を変えられる(append_flow vs AUTO CDC)

これについては次のセクションで詳しく説明します。

4.3 2種類のフロー

SDPには主に2種類のフローがあります:

フロー種類 ターゲットSTへの操作 用途
append_flow / INSERT INTO 追記のみ ログ蓄積、イベント収集
AUTO CDC ... INTO UPSERT / DELETE マスターデータ同期

ここで重要なポイントがあります:

「STはappend-only」という説明は不正確です。

  • STの実態はDelta Table。Delta TableはUPDATE/DELETEができる
  • append-onlyなのはフローの種類(append_flow)であって、ST自体ではない

この違いを理解することが、CDC処理を理解する鍵です。

5. CDCパイプラインにおけるフローとストリーミングテーブル

CDC(Change Data Capture)処理は、SDPの強力なユースケースの1つです。ここでは、フローとSTがどのように連携するかを詳しく見ていきます。

5.1 CDCパイプラインの2段階構造

CDCパイプラインは「蓄積」と「適用」の2段階で構成されます。

全体の流れ:

  1. ソースシステム(MySQL/PostgreSQLなど)
  2. → Debezium等でキャプチャ
  3. クラウドストレージ(S3/ADLS/GCS)にJSONファイルとして保存
  4. 【第1段階:蓄積】 append_flowcustomers_cdc(ST)に追記
  5. 【第2段階:適用】 AUTO CDCcustomers(ST)にUPSERT/DELETE

第1段階のテーブル例(customers_cdc):

id name op ts
001 田中 INSERT 10:00
001 田中太郎 UPDATE 11:00
002 鈴木 INSERT 12:00
001 (null) DELETE 13:00

4つのイベントが4行として追記されます。

第2段階のテーブル例(customers):

id name
002 鈴木

AUTO CDCが適用され、最新状態のみが残ります(id=001は削除済み)。

5.2 第1段階:CDCイベントの蓄積(append_flow)

Debeziumなどから送られてくるCDCイベントは、以下のようなJSONです:

{"id": "001", "name": "田中",     "op": "INSERT", "ts": "2025-01-01 10:00:00"}
{"id": "001", "name": "田中太郎", "op": "UPDATE", "ts": "2025-01-01 11:00:00"}
{"id": "002", "name": "鈴木",     "op": "INSERT", "ts": "2025-01-01 12:00:00"}
{"id": "001", "name": null,       "op": "DELETE", "ts": "2025-01-01 13:00:00"}

これをappend_flowでSTに取り込みます:

-- CDCイベントを蓄積するST
CREATE STREAMING TABLE customers_cdc;

-- append_flowでイベントを追記
CREATE FLOW ingest_cdc AS
INSERT INTO customers_cdc BY NAME
SELECT * FROM STREAM read_files(
    '/Volumes/catalog/schema/cdc/customers',
    format => 'json'
);

この段階では、UPDATE/DELETEイベントも 「1つのレコード」として追記 されるだけです。customers_cdcテーブルには4行が入ります。

5.3 第2段階:最終テーブルへの適用(AUTO CDC)

蓄積されたCDCイベントを解釈して、最終テーブルに反映します:

-- 最終的な顧客テーブル
CREATE STREAMING TABLE customers;

-- AUTO CDCでイベントを適用
CREATE FLOW apply_changes
AS AUTO CDC INTO customers
FROM STREAM customers_cdc
KEYS (id)                              -- 主キー
SEQUENCE BY ts                         -- 順序を決めるカラム
APPLY AS DELETE WHEN op = 'DELETE'     -- DELETE条件
COLUMNS * EXCEPT (op, ts);             -- 除外するカラム

AUTO CDCは以下を自動的に行います:

  1. KEYS (id) で同じidのレコードを特定
  2. SEQUENCE BY ts でイベントの順序を判定(遅延データにも対応)
  3. INSERT → 新規行を追加
  4. UPDATE → 既存行を更新
  5. DELETE → 既存行を削除

結果、customersテーブルには最新の状態のみが反映されます。

5.4 なぜ2段階に分けるのか

「AUTO CDCで直接取り込めばいいのでは?」と思うかもしれません。2段階に分ける理由があります:

1. デバッグ・監査が容易

生のCDCイベントが残っているので、問題発生時に原因を追跡できます。

-- 「id=001に何が起きた?」を調査できる
SELECT * FROM customers_cdc 
WHERE id = '001' 
ORDER BY ts;

2. 再処理が可能

AUTO CDCの設定を間違えた場合、customersテーブルを削除して再作成すれば、customers_cdcから再度適用できます。

3. 複数の最終テーブルを作れる

同じCDCイベントから、異なる形式のテーブルを作れます:

-- 最新状態のみ(SCD Type 1)
CREATE FLOW apply_scd1
AS AUTO CDC INTO customers
FROM STREAM customers_cdc
...
STORED AS SCD TYPE 1;

-- 履歴を残す(SCD Type 2)
CREATE FLOW apply_scd2
AS AUTO CDC INTO customers_history
FROM STREAM customers_cdc
...
STORED AS SCD TYPE 2;

5.5 フローの種類と動作のまとめ

フロー SQL構文 Python 動作
append_flow CREATE FLOW ... AS INSERT INTO ... @dp.append_flow(target="...") ターゲットに行を追記
AUTO CDC CREATE FLOW ... AS AUTO CDC INTO ... dp.create_auto_cdc_flow(...) ターゲットにUPSERT/DELETE

append_flowの特徴

  • ターゲットSTには追記のみ
  • シンプルで高速
  • ログ、イベント、CDCイベントの蓄積に最適

AUTO CDCの特徴

  • ターゲットSTにUPSERT/DELETEを適用
  • CDCイベントを解釈して最終状態を構築
  • 重複排除、順序制御を自動で行う
  • SCD Type 1/2をサポート

5.6 なぜCDC処理にSTが向いているのか

理由はソース側の特性にあります:

  • CDCイベント(Debeziumからのメッセージなど)は次々と追加されていくデータ
  • 過去のイベントは変更されない
  • だから「新しく来たイベントだけ処理する」STの増分処理が効率的

MVだと毎回全イベントを再スキャンするので非効率です。

6. STとMVの使い分け:判断フローチャート

どちらを使うべきか迷ったときは、以下の基準で判断してください。

判断基準:

  • データ取り込み・ログ蓄積 → ソースがAppend-onlyならST、更新されるならMV
  • 集計・変換・フィルタリング → ソースが更新されるならMV、されないならSTも可
  • 複数テーブルの結合MV

6.1 ストリーミングテーブルを選ぶケース

  1. クラウドストレージからのデータ取り込み(Auto Loader)

    • 新しいファイルだけを処理したい
    • S3/ADLS/GCSに継続的にファイルが追加される
  2. CDC(Change Data Capture)の処理

    • Debezium等からの変更イベントを蓄積
    • AUTO CDCと組み合わせて使用
  3. ログデータの蓄積

    • アプリケーションログ、イベントログ
    • 一度書いたら変更されないデータ
  4. Kafkaなどのメッセージキューからの取り込み

    • リアルタイムストリーミング処理

6.2 マテリアライズドビューを選ぶケース

  1. 集計処理

    • SUM、COUNT、AVG などの集約関数を使用
    • GROUP BY を含むクエリ
  2. 複数テーブルの結合

    • JOINを使った変換処理
    • ディメンションテーブルとの結合
  3. ソースデータが更新される場合

    • UPDATE/DELETEが発生するソース
    • 増分処理では整合性が取れないケース
  4. ウィンドウ関数を使った分析

    • LAG、LEAD、ROW_NUMBER など

7. 実践編:メダリオンアーキテクチャでの使い分け

実際のパイプラインでは、STとMVを組み合わせて使います。典型的なパターンを見てみましょう。

7.1 Bronze層:ストリーミングテーブル

データ取り込みにはSTを使います。

from pyspark import pipelines as dp

# Bronzeテーブルの定義
dp.create_streaming_table(
    name="bronze_sales",
    comment="Raw sales data from cloud storage"
)

@dp.append_flow(target="bronze_sales")
def ingest_sales():
    return (
        spark.readStream
            .format("cloudFiles")
            .option("cloudFiles.format", "json")
            .option("cloudFiles.inferColumnTypes", "true")
            .load("/Volumes/my_catalog/my_schema/raw_data/sales")
    )

7.2 Silver層:状況に応じて選択

パターンA:フィルタリング・クレンジングのみ → ST

# Silver(ST):単純なフィルタリングならSTで増分処理
dp.create_streaming_table(
    name="silver_sales",
    expect_all_or_drop={
        "valid_amount": "amount > 0",
        "valid_product": "product_id IS NOT NULL"
    }
)

@dp.append_flow(target="silver_sales")
def clean_sales():
    return (
        spark.readStream.table("bronze_sales")
            .select("product_id", "customer_id", "amount", "timestamp")
    )

パターンB:ディメンションテーブルとの結合 → MV

# Silver(MV):結合が必要な場合はMV
@dp.table(name="silver_sales_enriched")
def silver_sales_enriched():
    sales = spark.read.table("bronze_sales")
    products = spark.read.table("dim_products")
    
    return (
        sales.join(products, "product_id", "left")
            .select(
                sales["*"],
                products["product_name"],
                products["category"]
            )
    )

7.3 Gold層:マテリアライズドビュー

集計レイヤーにはMVを使います。

# Gold(MV):集計処理
@dp.table(name="gold_daily_sales")
def gold_daily_sales():
    return (
        spark.read.table("silver_sales")
            .withColumn("date", to_date("timestamp"))
            .groupBy("date", "product_id")
            .agg(
                sum("amount").alias("daily_total"),
                count("*").alias("transaction_count")
            )
    )

7.4 全体の構成

メダリオンアーキテクチャでの典型的な構成は以下の通りです:

  1. Cloud Storagebronze_sales(ST, Auto Loader)
  2. bronze_sales(ST) → STREAM → silver_sales(ST)
  3. bronze_sales(ST) + dim_products(外部) → JOIN → silver_sales_enriched(MV)
  4. silver_sales_enriched(MV) → gold_daily_sales(MV)

8. よくあるつまづきポイントと解決策

8.1 ❌ STなのにSTREAMを忘れる

症状:
STからSTにデータを流しているのに、増分処理にならない。毎回全件処理される。

原因:

# ❌ 間違い:spark.read を使っている
@dp.append_flow(target="downstream_st")
def flow():
    return spark.read.table("upstream_st")  # バッチ読み取り!

解決策:

# ✅ 正しい:spark.readStream を使う
@dp.append_flow(target="downstream_st")
def flow():
    return spark.readStream.table("upstream_st")

8.2 ❌ MVでSTREAMを使おうとする

症状:
エラーが発生する。

原因:

-- ❌ 間違い:MVでSTREAMは使えない
CREATE OR REPLACE MATERIALIZED VIEW my_mv AS
SELECT * FROM STREAM source_table;

解決策:

-- ✅ 正しい:通常のSELECT
CREATE OR REPLACE MATERIALIZED VIEW my_mv AS
SELECT * FROM source_table;

8.3 ❌ 集計処理をSTでやろうとする

症状:
集計結果がおかしい。または実行時エラーになる。

原因:
append_flowは追記のみなので、GROUP BYなどの集計は過去データの再計算が必要になり、STには適しません。

# ❌ 間違い:STで集計しようとしている
dp.create_streaming_table(name="sales_agg")

@dp.append_flow(target="sales_agg")
def agg():
    return (
        spark.readStream.table("sales")
            .groupBy("product_id")
            .agg(sum("amount"))  # 集計はSTに不向き
    )

解決策:

# ✅ 正しい:MVで集計する
@dp.table(name="sales_agg")
def agg():
    return (
        spark.read.table("sales")
            .groupBy("product_id")
            .agg(sum("amount").alias("total"))
    )

8.4 ❌ フルリフレッシュしたいのにSTを使っている

症状:
ソースデータを修正したのに、下流のテーブルに反映されない。

原因:
STは増分処理なので、「既に処理済み」のデータは再処理されません。

解決策:

  1. Full Refreshを実行する(UIまたはAPI)
  2. そもそもMVを使うべきケースかもしれない
# ソースデータが頻繁に更新される場合はMVが適切
@dp.table(name="my_table")
def my_table():
    return spark.read.table("frequently_updated_source")

8.5 ❌ STの依存関係でMVを挟んでしまう

症状:
パイプラインの一部が増分処理にならない。

原因:
ST(bronze) → MV(silver) → ST(gold) という構成の場合、MVは毎回フル再計算するため、その下流のSTも実質的に全件処理になってしまいます。

解決策:
増分処理を維持したい場合は、ST(bronze) → ST(silver) → ST(gold) のようにSTの連鎖を保ちます。

8.6 ❌ 「STはappend-only」を誤解する

症状:
CDC処理でSTを使えないと思い込む。または、AUTO CDCの動作が理解できない。

誤解:
「STはappend-onlyだから、UPDATE/DELETEはできない」

正しい理解:

  • STの実態はDelta Table。Delta TableはUPDATE/DELETEができる
  • append-onlyなのはappend_flowというフローの種類
  • AUTO CDCを使えば、STに対してUPSERT/DELETEを適用できる
-- AUTO CDCならSTにUPSERT/DELETEできる
CREATE FLOW apply_changes
AS AUTO CDC INTO customers  -- このSTにはUPSERT/DELETEが適用される
FROM STREAM customers_cdc
KEYS (id)
APPLY AS DELETE WHEN op = 'DELETE'
...

9. よくある質問

Q: MVでAUTO CDCは使えますか?

A: 使えません。AUTO CDCのターゲットはSTのみです。MVは毎回フル再計算する設計なので、増分的にUPSERT/DELETEを適用するAUTO CDCとは相性が悪いためです。

Q: append_flowとAUTO CDCを同じSTに対して使えますか?

A: 技術的には可能ですが、推奨されません。データの整合性が取りにくくなります。1つのSTには1種類のフローを使うのが基本です。

Q: STとMVのどちらが高速ですか?

A: 状況によります。

  • データ量が多く、新規データが少ない場合: STが高速(増分処理のため)
  • 集計が必要な場合: MVを使うしかない
  • 毎回全件処理が必要な場合: MVのほうがシンプル

Q: STの「チェックポイント」はどこに保存されますか?

A: パイプラインのメタデータとして管理されます。ユーザーが直接操作することは基本的にありません。

10. まとめ:STとMVの選択基準

判断基準 ストリーミングテーブル マテリアライズドビュー
ソースがAppend-only ✅ 最適 ⚪ 使える
ソースが更新される ⚪ Full Refresh必要 ✅ 最適
集計処理 ❌ 不向き ✅ 最適
JOIN処理 △ 条件付き ✅ 最適
大量データの増分取り込み ✅ 最適 ❌ 非効率
CDC処理(イベント蓄積) ✅ 最適 ❌ 不向き
CDC処理(最終テーブル) ✅ AUTO CDC使用 ❌ 不可
リアルタイム性が必要 ✅ 最適 ⚪ バッチ的

覚えておくべきポイント

  1. データ取り込み(Ingestion)はST
  2. 集計・結合(Aggregation/Join)はMV
  3. STからSTにはSTREAM、MVでは使わない
  4. 「append-only」はフローの種類であり、STの制約ではない
  5. AUTO CDCを使えばSTにUPSERT/DELETEできる

これらを押さえておけば、SDPでのパイプライン構築がスムーズになるはずです。

参考リンク

はじめてのDatabricks

はじめてのDatabricks

Databricks無料トライアル

Databricks無料トライアル

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?