0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Lakeflow SDPの増分処理とストリーミングテーブル

Last updated at Posted at 2025-12-21

はじめに

Level 1とLevel 2で、マテリアライズドビュー(MV)とエクスペクテーションを学びました。SQLを書くだけでパイプラインが作れ、データ品質もチェックできるようになりました。

しかし、データ量が増えてくると新たな問題が出てきます。

「毎回全件処理するのは遅い...」

例えば、毎日100万件のログが追加されるシステムを考えてみましょう。

  • 1日目: 100万件を処理 → OK
  • 1週間後: 700万件を処理 → まだ耐えられる
  • 1ヶ月後: 3000万件を処理 → 遅い...
  • 1年後: 3.6億件を処理 → 現実的でない

MVは毎回全件をスキャンします。データが増えれば増えるほど、処理時間も増えていきます。

この問題を解決するのがストリーミングテーブル(ST)増分処理です。

Lakeflow SDP入門:基礎から実践まで

本記事は、SDPを段階的に学ぶ学習パス「Lakeflow SDP入門:基礎から実践まで」の一部です。

Level タイトル 所要時間 学ぶ概念
1 SQLだけで始めるLakeflow SDP 30分 MV、パイプライン
2 Lakeflow SDPでデータ品質を守るエクスペクテーション 30分 エクスペクテーション
3 Lakeflow SDPの増分処理とストリーミングテーブル(本記事) 45分 ST、増分処理
4 Lakeflow SDPのフローを理解する 45分 フロー、append_flow
5 Lakeflow SDPのAUTO CDCでマスターデータ同期 60分 AUTO CDC、SCD

この記事で学ぶこと

  • なぜ増分処理が必要か
  • ストリーミングテーブル(ST)の動作原理
  • 「新規データ」の判定単位(ソースによる違い)
  • STとMVの使い分け

前提条件

  • Level 1, 2を完了している、またはSDPの基本操作ができる
  • SQLの基本が書ける

増分処理とは

一言で説明すると

「前回処理した続きから、新しいデータだけを処理する」 ことです。

MVとSTの違い

項目 マテリアライズドビュー(MV) ストリーミングテーブル(ST)
データの読み方 毎回全件スキャン 前回の続きから
処理対象 全データ 新規データのみ
データ量が増えると 処理時間も増加 処理時間は一定
適したユースケース 集計、結合、変換 データ取り込み、ログ蓄積

図で表すと:

1回目の実行
Screenshot 2025-12-22 at 6.58.08.png

初回実行では、MVもSTも全件(A, B, C)を処理します。STはチェックポイントに"C"を記録します。

2回目の実行
Screenshot 2025-12-22 at 6.58.49.png

2回目以降の差が出ます。MVは全件(5件)を再処理しますが、STは新規データ(D, E)の2件だけを処理します。

3回目の実行
Screenshot 2025-12-22 at 6.59.22.png

データが増えるほど差が広がります。MVは7件すべてを処理しますが、STは新規の2件(F, G)だけです。

STはチェックポイント(CP) で「どこまで処理したか」を記録し、次回は続きから処理します。

「新規データ」の判定単位

増分処理で重要なのは、「何を新規とみなすか」 です。これはソースの種類によって異なります。

ソース別の新規判定単位

ソース 新規判定の単位 追跡方法
クラウドストレージ(Auto Loader) ファイル単位 新しいファイルの存在をチェックポイントで記録
Kafka等のメッセージング メッセージ(レコード)単位 オフセットで追跡
Delta Table(STREAM読み取り) コミット単位 Delta Tableのバージョンで追跡

重要: Auto Loaderはファイル単位

クラウドストレージからAuto Loaderでデータを取り込む場合、ファイル単位で新規判定されます。

つまり:

  • 新しいファイルが追加された → 処理対象になる
  • 既存ファイルの中身が変更された → 検知されない(処理されない)
/data/
  ├── file1.json  ← 1回目で処理済み(変更しても再処理されない)
  ├── file2.json  ← 1回目で処理済み
  └── file3.json  ← 2回目で新規として処理される

これは重要なポイントです。Auto Loaderを使う場合は、既存ファイルを書き換えるのではなく、新しいファイルを追加していく運用が前提です。

✅ 推奨: 新しいファイルを追加
/data/
  ├── 2024-01-01.json
  ├── 2024-01-02.json  ← 新規追加 → 処理される
  └── 2024-01-03.json  ← 新規追加 → 処理される

❌ 非推奨: 既存ファイルを上書き
/data/
  └── data.json  ← 中身を更新しても再処理されない

STからSTへのデータの流れ

パイプライン内で、あるSTから別のSTにデータを流す場合も増分処理になります。

仕組みはシンプルです:

  • 上流のSTに新しい行が追加されたら、その行だけが下流のSTで処理される
  • 上流STの「どこまで処理したか」がチェックポイントとして記録される
-- 上流ST
CREATE STREAMING TABLE upstream;

-- 下流ST: 上流に追加された行だけを増分処理
CREATE STREAMING TABLE downstream;

CREATE FLOW process_data AS
INSERT INTO downstream BY NAME
SELECT * FROM STREAM upstream;  -- STREAMキーワードで増分読み取り

例えば:

  • 1回目: 上流STに100行ある → 100行すべて処理
  • 2回目: 上流STに20行追加された → 追加された20行だけ処理
  • 3回目: 上流STに50行追加された → 追加された50行だけ処理

ストリーミングテーブルの基本構文

STの定義方法

STは「テーブル定義」と「データの流し込み方(フロー)」が分離されています。

-- Step 1: テーブル(箱)を定義
CREATE STREAMING TABLE my_events;

-- Step 2: データの流し込み方(フロー)を定義
CREATE FLOW ingest_events AS
INSERT INTO my_events BY NAME
SELECT * FROM STREAM read_files('/path/to/data');

MVとの違い:

-- MVは定義とデータソースが一体
CREATE MATERIALIZED VIEW my_view AS
SELECT * FROM source_table;

-- STは分離されている
CREATE STREAMING TABLE my_table;  -- 箱だけ
CREATE FLOW my_flow AS ...;       -- データの入れ方

この「フロー」については、Level 4で詳しく学びます。今回は「STにデータを入れる方法」とだけ理解しておいてください。

STREAMキーワード

STの定義で重要なのがSTREAMキーワードです。

-- ✅ 増分読み取り: 新しいデータのみ処理
SELECT * FROM STREAM read_files('/path/to/data');
SELECT * FROM STREAM upstream_table;

-- ❌ バッチ読み取り: 毎回全件処理(STREAMなし)
SELECT * FROM read_files('/path/to/data');
SELECT * FROM upstream_table;

STREAMを付け忘れると、増分処理にならず毎回全件処理になってしまいます。これはよくある間違いなので注意してください。

ハンズオン: ストリーミングテーブルを作成する

実際にSTを作成して、増分処理を体験してみましょう。ファイルを追加するたびに「新規分だけが処理される」ことを確認します。

Step 1: 新しいパイプラインを作成

  1. 左サイドバーで新規をクリックし、ETL パイプラインを選択
  2. パイプライン名を入力(例: streaming-table-demo)
  3. カタログとスキーマを選択
  4. 空のファイルで開始を選択
  5. 言語はSQLを選択
  6. 選択をクリック

Step 2: データ保存用のボリュームを作成

まず、データを保存するボリュームを作成します。SQLエディタまたはノートブックで以下を実行してください。

-- ボリュームを作成(カタログ・スキーマは適宜変更)
CREATE VOLUME IF NOT EXISTS workspace.sdp.demo_volume;

Screenshot 2025-12-22 at 7.16.26.png

Step 3: 最初のデータファイルを作成

ボリュームにCSVファイルを作成します。ノートブックで以下を実行してください。

data_batch1 = """order_id,customer_id,amount,order_date
1,101,15000,2024-01-15
2,102,8500,2024-01-16
3,103,22000,2024-01-17
"""

# ボリュームのパスは適宜変更してください
dbutils.fs.put("/Volumes/workspace/sdp/demo_volume/orders_batch1.csv", data_batch1, overwrite=True)

print("batch1を作成しました(3件)")

Screenshot 2025-12-22 at 7.18.50.png

Step 4: Bronze層のストリーミングテーブルを作成

パイプラインエディタに戻り、Auto Loaderでファイルを取り込むSTを定義します。

-- Bronze層: ボリュームからCSVを増分取り込み
CREATE STREAMING TABLE bronze_orders;

CREATE FLOW ingest_orders AS
INSERT INTO bronze_orders BY NAME
SELECT 
    order_id::INT,
    customer_id::INT,
    amount::DOUBLE,
    order_date::DATE
FROM STREAM read_files(
    '/Volumes/workspace/sdp/demo_volume/',
    format => 'csv',
    header => 'true'
);

/Volumes/workspace/sdp/demo_volume/は、Step 2で作成したボリュームのパスに置き換えてください。

ファイルを実行をクリックして実行します。

実行後、下部パネルのデータタブで3件のデータが取り込まれていることを確認してください。

Screenshot 2025-12-22 at 7.20.42.png
Screenshot 2025-12-22 at 7.29.59.png

Step 5: 追加のデータファイルを作成

ノートブックで2つ目のファイルを追加します。

data_batch2 = """order_id,customer_id,amount,order_date
4,104,5000,2024-01-18
5,105,18000,2024-01-19
"""

dbutils.fs.put("/Volumes/workspace/sdp/demo_volume/orders_batch2.csv", data_batch2, overwrite=True)

print("batch2を作成しました(2件)")

Screenshot 2025-12-22 at 7.21.45.png

Step 6: 増分処理を確認

パイプラインエディタで再度ファイルを実行をクリックします。

Screenshot 2025-12-22 at 7.23.03.png
Screenshot 2025-12-22 at 7.23.25.png

確認ポイント:

  • 処理されたのはbatch2の2件だけ
  • batch1の3件は再処理されていない
  • 結果テーブルには合計5件が格納されている

これが増分処理です。Auto Loaderが「どのファイルを処理済みか」をチェックポイントで記録しているため、新しいファイルだけが処理されます。

Step 7: 下流のストリーミングテーブルを作成

Bronze層のSTから、Silver層のSTにデータを流します。

-- Silver層: 高額注文のみをフィルタリング
CREATE STREAMING TABLE silver_high_value_orders;

CREATE FLOW filter_high_value AS
INSERT INTO silver_high_value_orders BY NAME
SELECT *
FROM STREAM bronze_orders  -- STREAMキーワードで増分読み取り
WHERE amount >= 10000;

Step 8: エクスペクテーションを追加

Level 2で学んだエクスペクテーションは、STにも適用できます。

-- Silver層: エクスペクテーション付きストリーミングテーブル
CREATE STREAMING TABLE silver_orders_validated (
    CONSTRAINT positive_amount EXPECT (amount > 0) ON VIOLATION DROP ROW,
    CONSTRAINT valid_date EXPECT (order_date >= '2024-01-01') ON VIOLATION DROP ROW
);

CREATE FLOW validate_orders AS
INSERT INTO silver_orders_validated BY NAME
SELECT *
FROM STREAM bronze_orders;

Step 9: Gold層はマテリアライズドビュー

集計処理はMVを使います。STは増分処理に向いていますが、GROUP BYなどの集計は全データを見る必要があるためです。

-- Gold層: 集計はマテリアライズドビュー
CREATE MATERIALIZED VIEW gold_daily_sales AS
SELECT 
    order_date,
    COUNT(*) AS order_count,
    SUM(amount) AS total_sales
FROM silver_orders_validated  -- STREAMなし(バッチ読み取り)
GROUP BY order_date
ORDER BY order_date;

パイプラインを実行をクリックして、全体を実行します。

Screenshot 2025-12-22 at 7.32.28.png

Step 10: さらにデータを追加して増分処理を確認

data_batch3 = """order_id,customer_id,amount,order_date
6,106,12000,2024-01-20
7,107,3000,2024-01-20
8,108,25000,2024-01-21
"""

dbutils.fs.put("/Volumes/workspace/sdp/demo_volume/orders_batch3.csv", data_batch3, overwrite=True)

print("batch3を作成しました(3件)")

パイプラインを再実行すると:

タイプ 処理内容
Bronze ST batch3の3件だけを処理(増分)
Silver ST Bronze層に追加された3件だけを処理(増分)
Gold MV 全8件を再集計(フルスキャン)。出力レコードが7件になっているのはグルーピングした結果のレコード数を表示しているためです。

Screenshot 2025-12-22 at 7.35.44.png
Screenshot 2025-12-22 at 7.37.09.png

これがSTとMVの動作の違いです。

STとMVの使い分け

判断基準

ユースケース ST or MV 理由
ファイル取り込み ST Auto Loaderで効率的に増分処理
ログ・イベント蓄積 ST 追記のみ、増分処理が効率的
フィルタリング(WHERE) ST 増分処理可能
カラム選択・変換 ST 増分処理可能
日次/月次集計(GROUP BY) MV 全データを見る必要がある
複数テーブルの結合(JOIN) MV 増分処理が難しい
マスターデータ参照 MV 参照先が更新される可能性

簡単な判断フロー

  1. データ取り込み? → ST
  2. 集計(GROUP BY)が必要? → MV
  3. 結合(JOIN)が必要? → MV
  4. それ以外の変換? → ST

メダリオンアーキテクチャでの使い分け

典型的な構成:

推奨 理由
Bronze ST データ取り込みは増分処理が効率的
Silver ST フィルタリング、型変換は増分処理可能
Silver(結合あり) MV JOINが必要な場合
Gold MV 集計処理が中心

注意点とよくある間違い

1. STREAMキーワードの付け忘れ

-- ❌ 間違い: STREAMがない → 毎回全件処理
CREATE FLOW my_flow AS
INSERT INTO downstream BY NAME
SELECT * FROM upstream_table;

-- ✅ 正しい: STREAMで増分読み取り
CREATE FLOW my_flow AS
INSERT INTO downstream BY NAME
SELECT * FROM STREAM upstream_table;

2. MVでSTREAMを使おうとする

-- ❌ エラー: MVではSTREAMは使えない
CREATE MATERIALIZED VIEW my_mv AS
SELECT * FROM STREAM source_table;

-- ✅ 正しい: MVは通常のSELECT
CREATE MATERIALIZED VIEW my_mv AS
SELECT * FROM source_table;

3. STで集計しようとする

-- ❌ 問題: STで集計は適切でない
CREATE STREAMING TABLE sales_summary;

CREATE FLOW summarize AS
INSERT INTO sales_summary BY NAME
SELECT product_id, SUM(amount) as total
FROM STREAM sales
GROUP BY product_id;

-- ✅ 正しい: 集計はMVで
CREATE MATERIALIZED VIEW sales_summary AS
SELECT product_id, SUM(amount) as total
FROM sales
GROUP BY product_id;

4. 既存ファイルの更新を期待する

Auto Loaderは新しいファイルのみを検知します。既存ファイルを更新しても再処理されません。

-- 期待する動作(実際には起きない)
file1.jsonを更新 → 再処理される

-- 実際の動作
file1.jsonを更新 → 検知されない(処理されない)
file2.jsonを新規追加 → 処理される

データを修正したい場合は、新しいファイルとして追加するか、フルリフレッシュを実行します。

まとめ

今日できるようになったこと

  • MVの限界(毎回フルスキャン)を理解した
  • STの増分処理の仕組みを理解した
  • 「新規データ」の判定単位(ファイル/メッセージ/コミット)を理解した
  • STとMVを使い分けられるようになった

STの価値

  • 効率的: 新しいデータだけを処理
  • スケーラブル: データ量が増えても処理時間は一定
  • リアルタイム対応: 継続的なデータ取り込みに最適

次のステップ

ここまでで、STの基本を学びました。しかし、まだ疑問が残っているかもしれません:

  • 「フローって何?」
  • 「なぜSTは箱とデータの入れ方が分離されているの?」
  • 「複数のソースから1つのSTにデータを入れられる?」

次の記事 Level 4: Lakeflow SDPのフローを理解するでは、フローの概念と使い方を詳しく学びます。

参考リンク

はじめてのDatabricks

はじめてのDatabricks

Databricks無料トライアル

Databricks無料トライアル

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?