はじめに
Level 1では、SQLだけでパイプラインを作成し、依存関係が自動で解決されることを体験しました。
しかし、実際のデータパイプラインでは避けられない問題があります。
「不正なデータが混入したらどうする?」
- 価格がマイナスの注文データ
- 必須項目がNULLのレコード
- 想定外の値が入ったカラム
従来のアプローチでは、WHERE句でフィルタリングしたり、別途データ品質チェックのジョブを作成したりしていました。しかし、これには問題があります:
- 見えない: どれだけの不正データがあったか分からない
- 管理が分散: 品質ルールとETLロジックが別々の場所に
- 柔軟性がない: 「警告だけ」「除外する」「失敗させる」の使い分けができない
SDPのエクスペクテーションは、これらの問題を解決します。
Lakeflow SDP入門:基礎から実践まで
本記事は、SDPを段階的に学ぶ学習パス「Lakeflow SDP入門:基礎から実践まで」の一部です。各レベルで1つずつ新しい概念を学び、無理なくステップアップできる構成になっています。
| Level | タイトル | 所要時間 | 学ぶ概念 |
|---|---|---|---|
| 1 | SQLだけで始めるLakeflow SDP | 30分 | MV、パイプライン |
| 2 | Lakeflow SDPでデータ品質を守るエクスペクテーション(本記事) | 30分 | エクスペクテーション |
| 3 | Lakeflow SDPの増分処理とストリーミングテーブル | 45分 | ST、増分処理 |
| 4 | Lakeflow SDPのフローを理解する | 45分 | フロー、append_flow |
| 5 | Lakeflow SDPのAUTO CDCでマスターデータ同期 | 60分 | AUTO CDC、SCD |
この記事で学ぶこと
- エクスペクテーションとは何か
- 3つのモードの使い分け(警告 / 除外 / 失敗)
- 品質メトリクスの確認方法
- よくある品質ルールのパターン
前提条件
- Level 1を完了している、またはSDPの基本操作ができる
- SQLの基本が書ける
エクスペクテーションとは
一言で説明すると
「データが満たすべき条件を宣言し、違反時の動作を指定する仕組み」 です。
従来のアプローチとの違い
従来のアプローチ: WHERE句でフィルタリング
CREATE MATERIALIZED VIEW clean_orders AS
SELECT * FROM raw_orders
WHERE price > 0 AND customer_id IS NOT NULL;
この方法の問題点:
- 何件のデータが除外されたか分からない
- 品質ルールがクエリに埋もれて見えにくい
- 「警告だけ出して処理は続ける」ができない
エクスペクテーション: 宣言的に品質ルールを定義
CREATE MATERIALIZED VIEW clean_orders (
CONSTRAINT valid_price EXPECT (price > 0) ON VIOLATION DROP ROW,
CONSTRAINT valid_customer EXPECT (customer_id IS NOT NULL) ON VIOLATION DROP ROW
) AS
SELECT * FROM raw_orders;
この方法のメリット:
- 違反件数がメトリクスとして記録される
- 品質ルールが明示的に定義される
- 違反時の動作を柔軟に選べる
3つのモードを理解する
エクスペクテーションには3つのモードがあります。
| モード | 構文 | 違反時の動作 | ユースケース |
|---|---|---|---|
| 警告のみ | EXPECT (条件) |
警告を記録し、処理を継続 | 監視・分析用 |
| 行を除外 | EXPECT (条件) ON VIOLATION DROP ROW |
違反行を除外して処理を継続 | データクレンジング |
| 失敗 | EXPECT (条件) ON VIOLATION FAIL UPDATE |
パイプライン全体を失敗させる | 絶対に許容できない品質問題 |
それぞれ詳しく見ていきましょう。
モード1: 警告のみ(デフォルト)
CREATE MATERIALIZED VIEW orders_monitored (
CONSTRAINT warn_high_value EXPECT (price < 1000000)
) AS
SELECT * FROM raw_orders;
動作:
- 違反があっても全レコードが処理される
- 違反件数がメトリクスとして記録される
使いどころ:
- まず現状を把握したいとき
- 異常値の傾向を監視したいとき
- 将来的にルールを厳しくする前の準備段階
モード2: 違反行を除外(DROP ROW)
CREATE MATERIALIZED VIEW orders_cleaned (
CONSTRAINT valid_price EXPECT (price > 0) ON VIOLATION DROP ROW,
CONSTRAINT valid_date EXPECT (order_date IS NOT NULL) ON VIOLATION DROP ROW
) AS
SELECT * FROM raw_orders;
動作:
- 違反した行は結果テーブルに含まれない
- 違反件数がメトリクスとして記録される
- パイプラインは継続する
使いどころ:
- 不正データを下流に流したくないとき
- データクレンジングの定番パターン
- 「多少のデータ欠損は許容できる」ケース
モード3: パイプラインを失敗させる(FAIL UPDATE)
CREATE MATERIALIZED VIEW orders_critical (
CONSTRAINT must_have_customer EXPECT (customer_id IS NOT NULL) ON VIOLATION FAIL UPDATE
) AS
SELECT * FROM raw_orders;
動作:
- 1件でも違反があるとパイプライン全体が失敗する
- 下流のテーブルも更新されない
使いどころ:
- 絶対に許容できない品質問題
- 外部システムとの整合性が必須のデータ
- 規制要件で欠損が許されないケース
ハンズオン: パイプラインにエクスペクテーションを追加する
Level 1で作成したパイプラインを拡張して、エクスペクテーションを追加してみましょう。
Step 1: 既存のパイプラインを開く
Step 2: 警告のみのエクスペクテーションを追加
まず、現状把握のために警告のみのエクスペクテーションを追加します。
既存のsilver_completed_ordersを以下のように修正します:
-- Silver層: エクスペクテーション付き
CREATE MATERIALIZED VIEW silver_completed_orders (
-- 警告のみ: 高額注文を監視
CONSTRAINT warn_high_value EXPECT (o_totalprice < 500000)
) AS
SELECT
o_orderkey,
o_custkey,
o_totalprice,
o_orderdate,
o_orderpriority
FROM bronze_orders
WHERE o_orderstatus = 'F';
ファイルを実行をクリックして実行します。
Step 3: 品質メトリクスを確認する
実行が完了したら:
- パイプライングラフで
silver_completed_ordersをクリック - 下部パネルのテーブル指標タブを確認
-
警告の「▲36」をクリック(あるいはエクスペクテーションの「1件が未達成」)してデータ品質パネルを表示
以下の情報が表示されます:
- 制約名(
warn_high_value) - 書き込みレコード数
- 失敗レコード数
- 失敗率(%)
Step 4: 違反行を除外するエクスペクテーションを追加
次に、品質問題のあるデータを除外するテーブルを作成します。
samples.tpch.ordersは品質の高いサンプルデータなので、NULLや不正値はありません。ここでは「ビジネスルールに基づくフィルタリング」として、一定の条件を満たさないデータを除外する例を見てみましょう。
新しいファイルを作成するか、既存ファイルに追記します:
-- Silver層: ビジネスルールに基づくデータクレンジング
CREATE MATERIALIZED VIEW silver_orders_cleaned (
-- 1995年以降の注文のみ対象
CONSTRAINT recent_order EXPECT (o_orderdate >= '1995-01-01') ON VIOLATION DROP ROW,
-- 一定額以上の注文のみ対象
CONSTRAINT minimum_order_value EXPECT (o_totalprice >= 1000) ON VIOLATION DROP ROW,
-- 優先度が設定されていること
CONSTRAINT has_priority EXPECT (o_orderpriority IS NOT NULL) ON VIOLATION DROP ROW
) AS
SELECT
o_orderkey,
o_custkey,
o_totalprice,
o_orderdate,
o_orderpriority
FROM bronze_orders;
ファイルを実行をクリックして実行します。
実行後、データ品質タブを確認すると:
-
recent_order: 1995年より前の注文が除外された件数 -
minimum_order_value: 1000未満の少額注文が除外された件数
このように、エクスペクテーションは「不正データの除外」だけでなく、「ビジネスルールに基づくフィルタリング」にも活用できます。違いは、除外された件数がメトリクスとして記録されることです。
Step 5: 複数の制約を組み合わせる
実際のパイプラインでは、複数のモードを組み合わせることが多いです。
-- 複合的なデータ品質チェック
CREATE MATERIALIZED VIEW silver_orders_validated (
-- 警告のみ: 監視用(1993年より前の古い注文を検出)
CONSTRAINT warn_old_order EXPECT (o_orderdate >= '1993-01-01'),
-- 違反行を除外: ビジネスルール
CONSTRAINT drop_low_value EXPECT (o_totalprice >= 5000) ON VIOLATION DROP ROW,
-- 失敗: 絶対に許容できない問題
CONSTRAINT fail_invalid_status EXPECT (o_orderstatus IN ('F', 'O', 'P')) ON VIOLATION FAIL UPDATE
) AS
SELECT * FROM bronze_orders;
この例では:
- 古い注文は警告だけ出して処理を続ける(メトリクスで件数を確認可能)
- 5000未満の少額注文は除外して下流に流さない
- ステータスが想定外の値なら失敗させる(samples.tpch.ordersでは発生しないが、実データでは重要)
Step 6: Gold層にもエクスペクテーションを追加
集計テーブルにもエクスペクテーションを追加できます:
-- Gold層: 日別売上集計(品質チェック付き)
CREATE MATERIALIZED VIEW gold_daily_sales_validated (
-- 売上がマイナスになっていないか確認
CONSTRAINT valid_total EXPECT (total_sales >= 0),
-- 注文数が0以上か確認
CONSTRAINT valid_count EXPECT (order_count > 0) ON VIOLATION DROP ROW
) AS
SELECT
o_orderdate AS order_date,
COUNT(*) AS order_count,
SUM(o_totalprice) AS total_sales,
AVG(o_totalprice) AS avg_order_value
FROM silver_orders_cleaned
GROUP BY o_orderdate
ORDER BY o_orderdate;
今回のハンズオンの結果作成されたデータパイプラインは以下のようになります。

なお、右下の設定メニューでDAGオリエンテーションを切り替えることで、グラフの表示方向を変更できます。

よくある品質ルールのパターン
実務でよく使うエクスペクテーションのパターンを紹介します。
NULL チェック
CONSTRAINT not_null_id EXPECT (id IS NOT NULL) ON VIOLATION DROP ROW
CONSTRAINT not_null_name EXPECT (name IS NOT NULL) ON VIOLATION DROP ROW
範囲チェック
-- 数値の範囲
CONSTRAINT valid_age EXPECT (age BETWEEN 0 AND 150) ON VIOLATION DROP ROW
CONSTRAINT positive_amount EXPECT (amount > 0) ON VIOLATION DROP ROW
-- 日付の範囲
CONSTRAINT valid_date EXPECT (event_date >= '2020-01-01') ON VIOLATION DROP ROW
CONSTRAINT not_future EXPECT (event_date <= current_date()) ON VIOLATION DROP ROW
値の妥当性チェック
-- 許可された値のリスト
CONSTRAINT valid_status EXPECT (status IN ('active', 'inactive', 'pending')) ON VIOLATION DROP ROW
-- パターンマッチ
CONSTRAINT valid_email EXPECT (email LIKE '%@%.%') ON VIOLATION DROP ROW
一意性・重複チェック
エクスペクテーションだけでは一意性チェックはできませんが、集計と組み合わせることで検出できます:
-- 重複チェック用のビュー
CREATE MATERIALIZED VIEW duplicate_check (
CONSTRAINT no_duplicates EXPECT (record_count = 1) ON VIOLATION FAIL UPDATE
) AS
SELECT id, COUNT(*) as record_count
FROM source_table
GROUP BY id
HAVING COUNT(*) > 1;
条件付き必須チェック
「Aの場合はBが必須」というルールを表現する方法です。
-- ステータスがactiveなら金額が必須
-- (active以外 OR 金額がある) = activeなのに金額がない場合だけ違反
CONSTRAINT active_needs_amount EXPECT (
status != 'active' OR amount IS NOT NULL
) ON VIOLATION DROP ROW
この書き方は少し分かりにくいので、CASE式を使う方法もあります:
-- より読みやすい書き方
CONSTRAINT active_needs_amount EXPECT (
CASE WHEN status = 'active' THEN amount IS NOT NULL ELSE TRUE END
) ON VIOLATION DROP ROW
エクスペクテーション設計のベストプラクティス
1. 段階的に厳しくする
最初は警告のみで現状を把握し、徐々に厳しくしていく:
Step 1: EXPECT (condition) -- まず監視
Step 2: EXPECT (condition) ON VIOLATION DROP ROW -- 問題なければ除外に変更
Step 3: EXPECT (condition) ON VIOLATION FAIL UPDATE -- 重要なら失敗に
2. 制約名を分かりやすく
-- ❌ 悪い例
CONSTRAINT c1 EXPECT (price > 0)
-- ✅ 良い例
CONSTRAINT positive_price EXPECT (price > 0)
制約名はメトリクスに表示されるため、何をチェックしているか一目で分かる名前にしましょう。
3. 層ごとに役割を分ける
| 層 | エクスペクテーションの役割 |
|---|---|
| Bronze | 最低限のチェック(NULLなど) |
| Silver | データクレンジング(DROP ROW中心) |
| Gold | 最終確認(警告 or FAIL) |
4. FAIL UPDATEは慎重に
FAIL UPDATEを使いすぎると、パイプライン全体が頻繁に止まってしまいます。本当に「1件でも違反があったら全体を止めるべき」ケースにのみ使用しましょう。
まとめ
今日できるようになったこと
- エクスペクテーションの3つのモードを使い分けられる
- 品質メトリクスを確認できる
- よくある品質ルールのパターンを適用できる
エクスペクテーションの価値
- 可視化: 違反件数がメトリクスとして記録される
- 宣言的: 品質ルールがコードとして明示される
- 柔軟: 警告・除外・失敗を使い分けられる
次のステップ
ここまでで、マテリアライズドビューとエクスペクテーションを学びました。しかし、データ量が増えてくると新たな課題が出てきます:
- 「毎回全件処理するのは遅い...」
- 「新しいデータだけを効率的に処理したい」
次の記事 Level 3: Lakeflow SDPの増分処理とストリーミングテーブル では、増分処理の考え方とストリーミングテーブルの使い方を学びます。
参考リンク
- Lakeflow Spark宣言型パイプライン公式ドキュメント
- エクスペクテーションによるデータ品質の管理
- チュートリアル: Lakeflow Spark宣言型パイプラインを使用してETLパイプラインを構築する




