はじめに
Level 1とLevel 2で、マテリアライズドビュー(MV)とエクスペクテーションを学びました。SQLを書くだけでパイプラインが作れ、データ品質もチェックできるようになりました。
しかし、データ量が増えてくると新たな問題が出てきます。
「毎回全件処理するのは遅い...」
例えば、毎日100万件のログが追加されるシステムを考えてみましょう。
- 1日目: 100万件を処理 → OK
- 1週間後: 700万件を処理 → まだ耐えられる
- 1ヶ月後: 3000万件を処理 → 遅い...
- 1年後: 3.6億件を処理 → 現実的でない
MVは毎回全件をスキャンします。データが増えれば増えるほど、処理時間も増えていきます。
この問題を解決するのがストリーミングテーブル(ST)と増分処理です。
Lakeflow SDP入門:基礎から実践まで
本記事は、SDPを段階的に学ぶ学習パス「Lakeflow SDP入門:基礎から実践まで」の一部です。
| Level | タイトル | 所要時間 | 学ぶ概念 |
|---|---|---|---|
| 1 | SQLだけで始めるLakeflow SDP | 30分 | MV、パイプライン |
| 2 | Lakeflow SDPでデータ品質を守るエクスペクテーション | 30分 | エクスペクテーション |
| 3 | Lakeflow SDPの増分処理とストリーミングテーブル(本記事) | 45分 | ST、増分処理 |
| 4 | Lakeflow SDPのフローを理解する | 45分 | フロー、append_flow |
| 5 | Lakeflow SDPのAUTO CDCでマスターデータ同期 | 60分 | AUTO CDC、SCD |
この記事で学ぶこと
- なぜ増分処理が必要か
- ストリーミングテーブル(ST)の動作原理
- 「新規データ」の判定単位(ソースによる違い)
- STとMVの使い分け
前提条件
- Level 1, 2を完了している、またはSDPの基本操作ができる
- SQLの基本が書ける
増分処理とは
一言で説明すると
「前回処理した続きから、新しいデータだけを処理する」 ことです。
MVとSTの違い
| 項目 | マテリアライズドビュー(MV) | ストリーミングテーブル(ST) |
|---|---|---|
| データの読み方 | 毎回全件スキャン | 前回の続きから |
| 処理対象 | 全データ | 新規データのみ |
| データ量が増えると | 処理時間も増加 | 処理時間は一定 |
| 適したユースケース | 集計、結合、変換 | データ取り込み、ログ蓄積 |
図で表すと:
初回実行では、MVもSTも全件(A, B, C)を処理します。STはチェックポイントに"C"を記録します。
2回目以降の差が出ます。MVは全件(5件)を再処理しますが、STは新規データ(D, E)の2件だけを処理します。
データが増えるほど差が広がります。MVは7件すべてを処理しますが、STは新規の2件(F, G)だけです。
STはチェックポイント(CP) で「どこまで処理したか」を記録し、次回は続きから処理します。
「新規データ」の判定単位
増分処理で重要なのは、「何を新規とみなすか」 です。これはソースの種類によって異なります。
ソース別の新規判定単位
| ソース | 新規判定の単位 | 追跡方法 |
|---|---|---|
| クラウドストレージ(Auto Loader) | ファイル単位 | 新しいファイルの存在をチェックポイントで記録 |
| Kafka等のメッセージング | メッセージ(レコード)単位 | オフセットで追跡 |
| Delta Table(STREAM読み取り) | コミット単位 | Delta Tableのバージョンで追跡 |
重要: Auto Loaderはファイル単位
クラウドストレージからAuto Loaderでデータを取り込む場合、ファイル単位で新規判定されます。
つまり:
- 新しいファイルが追加された → 処理対象になる
- 既存ファイルの中身が変更された → 検知されない(処理されない)
/data/
├── file1.json ← 1回目で処理済み(変更しても再処理されない)
├── file2.json ← 1回目で処理済み
└── file3.json ← 2回目で新規として処理される
これは重要なポイントです。Auto Loaderを使う場合は、既存ファイルを書き換えるのではなく、新しいファイルを追加していく運用が前提です。
✅ 推奨: 新しいファイルを追加
/data/
├── 2024-01-01.json
├── 2024-01-02.json ← 新規追加 → 処理される
└── 2024-01-03.json ← 新規追加 → 処理される
❌ 非推奨: 既存ファイルを上書き
/data/
└── data.json ← 中身を更新しても再処理されない
STからSTへのデータの流れ
パイプライン内で、あるSTから別のSTにデータを流す場合も増分処理になります。
仕組みはシンプルです:
- 上流のSTに新しい行が追加されたら、その行だけが下流のSTで処理される
- 上流STの「どこまで処理したか」がチェックポイントとして記録される
-- 上流ST
CREATE STREAMING TABLE upstream;
-- 下流ST: 上流に追加された行だけを増分処理
CREATE STREAMING TABLE downstream;
CREATE FLOW process_data AS
INSERT INTO downstream BY NAME
SELECT * FROM STREAM upstream; -- STREAMキーワードで増分読み取り
例えば:
- 1回目: 上流STに100行ある → 100行すべて処理
- 2回目: 上流STに20行追加された → 追加された20行だけ処理
- 3回目: 上流STに50行追加された → 追加された50行だけ処理
ストリーミングテーブルの基本構文
STの定義方法
STは「テーブル定義」と「データの流し込み方(フロー)」が分離されています。
-- Step 1: テーブル(箱)を定義
CREATE STREAMING TABLE my_events;
-- Step 2: データの流し込み方(フロー)を定義
CREATE FLOW ingest_events AS
INSERT INTO my_events BY NAME
SELECT * FROM STREAM read_files('/path/to/data');
MVとの違い:
-- MVは定義とデータソースが一体
CREATE MATERIALIZED VIEW my_view AS
SELECT * FROM source_table;
-- STは分離されている
CREATE STREAMING TABLE my_table; -- 箱だけ
CREATE FLOW my_flow AS ...; -- データの入れ方
この「フロー」については、Level 4で詳しく学びます。今回は「STにデータを入れる方法」とだけ理解しておいてください。
STREAMキーワード
STの定義で重要なのがSTREAMキーワードです。
-- ✅ 増分読み取り: 新しいデータのみ処理
SELECT * FROM STREAM read_files('/path/to/data');
SELECT * FROM STREAM upstream_table;
-- ❌ バッチ読み取り: 毎回全件処理(STREAMなし)
SELECT * FROM read_files('/path/to/data');
SELECT * FROM upstream_table;
STREAMを付け忘れると、増分処理にならず毎回全件処理になってしまいます。これはよくある間違いなので注意してください。
ハンズオン: ストリーミングテーブルを作成する
実際にSTを作成して、増分処理を体験してみましょう。ファイルを追加するたびに「新規分だけが処理される」ことを確認します。
Step 1: 新しいパイプラインを作成
- 左サイドバーで新規をクリックし、ETL パイプラインを選択
- パイプライン名を入力(例:
streaming-table-demo) - カタログとスキーマを選択
- 空のファイルで開始を選択
- 言語はSQLを選択
- 選択をクリック
Step 2: データ保存用のボリュームを作成
まず、データを保存するボリュームを作成します。SQLエディタまたはノートブックで以下を実行してください。
-- ボリュームを作成(カタログ・スキーマは適宜変更)
CREATE VOLUME IF NOT EXISTS workspace.sdp.demo_volume;
Step 3: 最初のデータファイルを作成
ボリュームにCSVファイルを作成します。ノートブックで以下を実行してください。
data_batch1 = """order_id,customer_id,amount,order_date
1,101,15000,2024-01-15
2,102,8500,2024-01-16
3,103,22000,2024-01-17
"""
# ボリュームのパスは適宜変更してください
dbutils.fs.put("/Volumes/workspace/sdp/demo_volume/orders_batch1.csv", data_batch1, overwrite=True)
print("batch1を作成しました(3件)")
Step 4: Bronze層のストリーミングテーブルを作成
パイプラインエディタに戻り、Auto Loaderでファイルを取り込むSTを定義します。
-- Bronze層: ボリュームからCSVを増分取り込み
CREATE STREAMING TABLE bronze_orders;
CREATE FLOW ingest_orders AS
INSERT INTO bronze_orders BY NAME
SELECT
order_id::INT,
customer_id::INT,
amount::DOUBLE,
order_date::DATE
FROM STREAM read_files(
'/Volumes/workspace/sdp/demo_volume/',
format => 'csv',
header => 'true'
);
/Volumes/workspace/sdp/demo_volume/は、Step 2で作成したボリュームのパスに置き換えてください。
ファイルを実行をクリックして実行します。
実行後、下部パネルのデータタブで3件のデータが取り込まれていることを確認してください。
Step 5: 追加のデータファイルを作成
ノートブックで2つ目のファイルを追加します。
data_batch2 = """order_id,customer_id,amount,order_date
4,104,5000,2024-01-18
5,105,18000,2024-01-19
"""
dbutils.fs.put("/Volumes/workspace/sdp/demo_volume/orders_batch2.csv", data_batch2, overwrite=True)
print("batch2を作成しました(2件)")
Step 6: 増分処理を確認
パイプラインエディタで再度ファイルを実行をクリックします。
確認ポイント:
- 処理されたのはbatch2の2件だけ
- batch1の3件は再処理されていない
- 結果テーブルには合計5件が格納されている
これが増分処理です。Auto Loaderが「どのファイルを処理済みか」をチェックポイントで記録しているため、新しいファイルだけが処理されます。
Step 7: 下流のストリーミングテーブルを作成
Bronze層のSTから、Silver層のSTにデータを流します。
-- Silver層: 高額注文のみをフィルタリング
CREATE STREAMING TABLE silver_high_value_orders;
CREATE FLOW filter_high_value AS
INSERT INTO silver_high_value_orders BY NAME
SELECT *
FROM STREAM bronze_orders -- STREAMキーワードで増分読み取り
WHERE amount >= 10000;
Step 8: エクスペクテーションを追加
Level 2で学んだエクスペクテーションは、STにも適用できます。
-- Silver層: エクスペクテーション付きストリーミングテーブル
CREATE STREAMING TABLE silver_orders_validated (
CONSTRAINT positive_amount EXPECT (amount > 0) ON VIOLATION DROP ROW,
CONSTRAINT valid_date EXPECT (order_date >= '2024-01-01') ON VIOLATION DROP ROW
);
CREATE FLOW validate_orders AS
INSERT INTO silver_orders_validated BY NAME
SELECT *
FROM STREAM bronze_orders;
Step 9: Gold層はマテリアライズドビューで
集計処理はMVを使います。STは増分処理に向いていますが、GROUP BYなどの集計は全データを見る必要があるためです。
-- Gold層: 集計はマテリアライズドビュー
CREATE MATERIALIZED VIEW gold_daily_sales AS
SELECT
order_date,
COUNT(*) AS order_count,
SUM(amount) AS total_sales
FROM silver_orders_validated -- STREAMなし(バッチ読み取り)
GROUP BY order_date
ORDER BY order_date;
パイプラインを実行をクリックして、全体を実行します。
Step 10: さらにデータを追加して増分処理を確認
data_batch3 = """order_id,customer_id,amount,order_date
6,106,12000,2024-01-20
7,107,3000,2024-01-20
8,108,25000,2024-01-21
"""
dbutils.fs.put("/Volumes/workspace/sdp/demo_volume/orders_batch3.csv", data_batch3, overwrite=True)
print("batch3を作成しました(3件)")
パイプラインを再実行すると:
| 層 | タイプ | 処理内容 |
|---|---|---|
| Bronze | ST | batch3の3件だけを処理(増分) |
| Silver | ST | Bronze層に追加された3件だけを処理(増分) |
| Gold | MV | 全8件を再集計(フルスキャン)。出力レコードが7件になっているのはグルーピングした結果のレコード数を表示しているためです。 |
これがSTとMVの動作の違いです。
STとMVの使い分け
判断基準
| ユースケース | ST or MV | 理由 |
|---|---|---|
| ファイル取り込み | ST | Auto Loaderで効率的に増分処理 |
| ログ・イベント蓄積 | ST | 追記のみ、増分処理が効率的 |
| フィルタリング(WHERE) | ST | 増分処理可能 |
| カラム選択・変換 | ST | 増分処理可能 |
| 日次/月次集計(GROUP BY) | MV | 全データを見る必要がある |
| 複数テーブルの結合(JOIN) | MV | 増分処理が難しい |
| マスターデータ参照 | MV | 参照先が更新される可能性 |
簡単な判断フロー
- データ取り込み? → ST
- 集計(GROUP BY)が必要? → MV
- 結合(JOIN)が必要? → MV
- それ以外の変換? → ST
メダリオンアーキテクチャでの使い分け
典型的な構成:
| 層 | 推奨 | 理由 |
|---|---|---|
| Bronze | ST | データ取り込みは増分処理が効率的 |
| Silver | ST | フィルタリング、型変換は増分処理可能 |
| Silver(結合あり) | MV | JOINが必要な場合 |
| Gold | MV | 集計処理が中心 |
注意点とよくある間違い
1. STREAMキーワードの付け忘れ
-- ❌ 間違い: STREAMがない → 毎回全件処理
CREATE FLOW my_flow AS
INSERT INTO downstream BY NAME
SELECT * FROM upstream_table;
-- ✅ 正しい: STREAMで増分読み取り
CREATE FLOW my_flow AS
INSERT INTO downstream BY NAME
SELECT * FROM STREAM upstream_table;
2. MVでSTREAMを使おうとする
-- ❌ エラー: MVではSTREAMは使えない
CREATE MATERIALIZED VIEW my_mv AS
SELECT * FROM STREAM source_table;
-- ✅ 正しい: MVは通常のSELECT
CREATE MATERIALIZED VIEW my_mv AS
SELECT * FROM source_table;
3. STで集計しようとする
-- ❌ 問題: STで集計は適切でない
CREATE STREAMING TABLE sales_summary;
CREATE FLOW summarize AS
INSERT INTO sales_summary BY NAME
SELECT product_id, SUM(amount) as total
FROM STREAM sales
GROUP BY product_id;
-- ✅ 正しい: 集計はMVで
CREATE MATERIALIZED VIEW sales_summary AS
SELECT product_id, SUM(amount) as total
FROM sales
GROUP BY product_id;
4. 既存ファイルの更新を期待する
Auto Loaderは新しいファイルのみを検知します。既存ファイルを更新しても再処理されません。
-- 期待する動作(実際には起きない)
file1.jsonを更新 → 再処理される
-- 実際の動作
file1.jsonを更新 → 検知されない(処理されない)
file2.jsonを新規追加 → 処理される
データを修正したい場合は、新しいファイルとして追加するか、フルリフレッシュを実行します。
まとめ
今日できるようになったこと
- MVの限界(毎回フルスキャン)を理解した
- STの増分処理の仕組みを理解した
- 「新規データ」の判定単位(ファイル/メッセージ/コミット)を理解した
- STとMVを使い分けられるようになった
STの価値
- 効率的: 新しいデータだけを処理
- スケーラブル: データ量が増えても処理時間は一定
- リアルタイム対応: 継続的なデータ取り込みに最適
次のステップ
ここまでで、STの基本を学びました。しかし、まだ疑問が残っているかもしれません:
- 「フローって何?」
- 「なぜSTは箱とデータの入れ方が分離されているの?」
- 「複数のソースから1つのSTにデータを入れられる?」
次の記事 Level 4: Lakeflow SDPのフローを理解するでは、フローの概念と使い方を詳しく学びます。
参考リンク
- Lakeflow Spark宣言型パイプライン公式ドキュメント
- ストリーミングテーブル
- マテリアライズドビュー
- フロー
- Auto Loaderとは
- パイプラインの更新
- チュートリアル: Lakeflow Spark宣言型パイプラインを使用してETLパイプラインを構築する












