1. はじめに: Delta Live Tables (DLT) でデータ品質管理できるの?
Databricks の Delta Live Tables (DLT) ってありますよね。
「Delta Live Tables (DLT) を使えば、データ品質管理ができる」
公式ドキュメントを見ていても「具体的にどうやって?」「@dlt.expect
と書くだけで、本当に便利になるの?」と、正直なところ、私はなかなか具体的なイメージが湧きませんでした。
そこで、「百聞は一見に如かず。まずは手を動かしてみよう!」と思い立ち、簡単なデータパイプラインで DLT のデータ品質管理機能を実際に試してみることにしました。
結論から言うと、この試みは正解でした。
コードのどこに何を書けば良いのか、品質チェックの結果がどのように可視化されるのか、そしてルール違反のデータがどう扱われるのか。これまで漠然としていた DLT のデータ品質管理が、一連の体験を通して「なるほど、こういうことか!」と、イメージできるようになりました。
この記事では、「DLT の品質管理がピンとこない」と感じている方に向けて、私が試した具体的な手順と、そこから得られた気づきを共有します。
2. データ品質を測る「6つの指標」
データ品質は多角的な側面から評価されます。ここでは、DLT で管理しやすい6つの指標を紹介します。
品質指標 | 説明 |
---|---|
1. 信頼性 (Reliability) | データの出所が明確で、処理の追跡が可能か。 |
2. 完全性 (Completeness) | 必要なデータ項目がすべて揃っているか。 |
3. 一貫性 (Consistency) | データが組織内で定められた基準やフォーマットに準拠しているか。 |
4. 統合性 (Integrity) | 複数のデータセット間の関連性が保たれているか。 |
5. 適時性・最新性 (Timeliness/Freshness) | データが適切なタイミングで更新され、鮮度が保たれているか。 |
6. 一意性 (Uniqueness) | データが重複なく、一意に識別できるか。 |
3. 実践!DLTでデータ品質パイプラインを構築する
ECサイトの顧客データと注文データを処理するパイプラインを例に、DLTによるデータ品質管理の実装方法を見ていきましょう。
3.1 シナリオとアーキテクチャ
- シナリオ: 外部ソースから顧客データと注文データを取得し、品質チェックを経て、最終的に顧客ごとの注文集計データを作成します。
- アーキテクチャ: Bronze(生データ)、Silver(クレンジング済みデータ)、Gold(集計済みデータ)のメダリオンアーキテクチャを採用します。
3.2 DLTパイプラインのサンプルコード (Python)
このコードは、Databricksノートブック上でDLTパイプラインとして実行することを前提としています。
import dlt
from pyspark.sql.functions import col, expr, row_number
from pyspark.sql.session import SparkSession
from pyspark.sql.window import Window
# --- 1. Bronze Layer: 生データの読み込み ---
@dlt.table(
name="raw_customers",
comment="【1. 信頼性】ソースシステムから取り込んだ生の顧客データ"
)
def raw_customers():
"""
データの取得元がこのコードブロックで明示されるため、「信頼性」が担保されます。
サンプルデータには、品質チェックで検出されるべき不正なデータ(NULL、重複など)を含んでいます。
"""
spark = SparkSession.builder.getOrCreate()
data = [
(1, "山田 太郎", "東京都渋谷区", "taro@example.com", "2023-10-01"),
(2, "佐藤 花子", "大阪府大阪市", "hanako@example.com", "2023-10-02"),
(3, "鈴木 一郎", None, "ichiro@example.com", "2023-10-03"), # 住所がNULL
(4, "高橋 四郎", "福岡県福岡市", "shiro", "2023-10-04"), # email形式が不正
(5, "田中 次郎", "東京都渋谷区", "jiro@example.com", "2023-10-05"),
(1, "山田 太郎", "東京都渋谷区", "taro@example.com", "2023-10-06") # 顧客ID=1が重複
]
columns = ["customer_id", "name", "address", "email", "created_at"]
return spark.createDataFrame(data, columns)
@dlt.table(
name="raw_orders",
comment="【1. 信頼性】ソースシステムから取り込んだ生の注文データ"
)
def raw_orders():
spark = SparkSession.builder.getOrCreate()
data = [
(101, 1, 5000, "2023-11-01"),
(102, 2, 3000, "2023-11-02"),
(103, 1, 2000, "2023-11-03"),
(104, 99, 1000, "2023-11-04") # 存在しない顧客ID(99)
]
columns = ["order_id", "customer_id", "amount", "order_date"]
return spark.createDataFrame(data, columns)
# --- 2. Silver Layer: データ品質チェックとクレンジング ---
@dlt.table(
name="clean_customers",
comment="データ品質ルールを適用し、クレンジングされた顧客データ"
)
# 【2. 完全性】必須項目(ID, 住所)がNULLの行は処理から除外(drop)
@dlt.expect_or_drop("completeness_check", "customer_id IS NOT NULL AND address IS NOT NULL")
# 【3. 一貫性】emailが正しい形式かチェック(違反データは警告)
@dlt.expect("consistency_check_email", "email LIKE '%@%.%'")
# 【5. 最新性】データが一定の鮮度を保っているかチェック
@dlt.expect("freshness_check", "to_date(created_at) > '2023-01-01'")
def clean_customers():
return dlt.read("raw_customers")
# --- 3. Gold Layer: データ統合とビジネスロジックの適用 ---
@dlt.table(
name="customer_summary",
comment="最終的な顧客マスタ。重複を排除し、注文情報を付加"
)
def customer_summary():
"""
【6. 一意性】と【4. 統合性】をこのテーブルで担保します。
"""
customers = dlt.read("clean_customers")
orders = dlt.read("raw_orders")
# 【6. 一意性】customer_idの重複を排除(created_atが最新のレコードを採用)
window = Window.partitionBy("customer_id").orderBy(col("created_at").desc())
unique_customers = customers.withColumn("row_num", row_number().over(window)) \
.filter(col("row_num") == 1) \
.drop("row_num")
# 【4. 統合性】顧客データと注文データを内部結合。注文に紐づく顧客が必ず存在することを保証
customer_orders = unique_customers.join(orders, "customer_id", "inner") \
.groupBy("customer_id", "name", "address") \
.agg(expr("sum(amount) as total_amount"),
expr("count(order_id) as order_count"))
return customer_orders
3.3 コード解説:6つの品質指標をどう実装したか?
-
信頼性:データソースの明確化
@dlt.table
で定義された関数 (raw_customers
,raw_orders
) がデータの取得元となり、コード自体がデータの来歴を文書化します。 -
完全性:必須データの欠損チェック
@dlt.expect_or_drop
を使い、「customer_id
とaddress
がNULLであってはならない」というルールを定義。違反したレコードは後続の処理から自動的に除外(DROP) されます。@dlt.expect_or_drop("completeness_check", "customer_id IS NOT NULL AND address IS NOT NULL")
-
一貫性:データフォーマットの統一
@dlt.expect
を使い、「email
列は@
を含む」というルールを定義。違反したレコードはテーブルに残りますが、イベントログに警告(ALLOW) として記録され、品質を監視できます。@dlt.expect("consistency_check_email", "email LIKE '%@%.%'")
-
統合性:データ間の整合性確保
顧客テーブルと注文テーブルをinner join
で結合します。これにより、注文に紐づく顧客が必ず顧客テーブルに存在するレコードのみが残り、参照整合性が保たれます。unique_customers.join(orders, "customer_id", "inner")
-
適時性・最新性:データの鮮度維持
DLTパイプラインをスケジュール実行することで適時性を、@dlt.expect
で日付をチェックすることで最新性を検証します。@dlt.expect("freshness_check", "to_date(created_at) > '2023-01-01'")
-
一意性:データの重複排除
Sparkのウィンドウ関数を使い、customer_id
でデータをグループ化し、最新のレコード1件のみを抽出します。window = Window.partitionBy("customer_id").orderBy(col("created_at").desc()) unique_customers = customers.withColumn("row_num", row_number().over(window)) \ .filter(col("row_num") == 1)
4. 実行結果を読み解く:DLTはデータ品質をどう可視化するか
パイプラインを実行すると、処理の流れとデータ品質チェックの結果がグラフィカルに表示されます。
4.1 パイプライングラフとレコード数の変化
-
raw_customers
(6件) →clean_customers
(5件)-
completeness_check
(住所がNULL)に違反した顧客ID=3のレコードが1件削除されました。
-
-
clean_customers
(5件) +raw_orders
(4件) →customer_summary
(2件)- クレンジング後の顧客データと注文データを結合し、集計した結果です。
4.2 データ品質メトリクスの確認
clean_customers
テーブルを選択すると、品質チェックの結果が確認できます。
-
consistency_check_email
(ALLOW): 失敗レコード: 1- メール形式が不正な顧客ID=4のレコードです。ルールが
ALLOW
(許可)のため、テーブルには残りますが品質違反として記録されます。
- メール形式が不正な顧客ID=4のレコードです。ルールが
-
completeness_check
(DROP): 失敗レコード: 1- 住所がNULLの顧客ID=3のレコードです。ルールが
DROP
(削除)のため、テーブルから除外されました。
- 住所がNULLの顧客ID=3のレコードです。ルールが
4.3 最終的なGoldテーブルの検証
最終的なcustomer_summary
テーブルが2行になる理由は以下の通りです。
-
顧客ID=1 (山田 太郎): 重複レコードが1件に絞られ、2件の注文が集計されました (
total_amount: 7000
,order_count: 2
)。 -
顧客ID=2 (佐藤 花子): 1件の注文が集計されました (
total_amount: 3000
,order_count: 1
)。
【最終結果に含まれないデータとその理由】
-
顧客ID=3:
completeness_check
でSilverテーブルから削除されました。 -
顧客ID=4, 5: 注文データが存在しないため、
inner join
で除外されました。 -
注文ID=104 (顧客ID=99): 対応する顧客が存在しないため、
inner join
で除外されました。
この結果から、DLTが定義通りのデータ品質ルールと変換ロジックを正確に実行していることがわかります。
5. 【重要】コストを抑えるための必須設定
DLTパイプラインを作成する際、「パイプラインモード」 の設定に注意してください。
- 開発 (Development): パイプラインを停止するまでクラスターが起動し続けます(コスト高)。
- プロダクション (Production): 実行時のみクラスターが起動し、完了後に自動で停止します(コスト低)。
注意: デフォルトは「開発」モードです。検証後や本番運用では、必ず「プロダクション」モードに切り替えることを推奨します。
6. まとめ:試してわかった、DLTの本当の価値
はじめに、「DLTのデータ品質管理は、ドキュメントを読むだけではイメージが湧きにくい」というお話をしました。
今回、実際に手を動かしてパイプラインを構築してみて、私が最も価値を感じたのは、DLTがデータ品質管理を「仕組み化」し、「属人性を排除」してくれる点です。
これまで個人の手作業に頼りがちだった品質チェックが、@dlt.expect
という宣言的なコードとしてパイプラインに組み込まれる。これにより、「誰が実行しても、同じ基準で、自動的に」 データ品質が保証されるようになります。
これは単なる機能ではなく、データ基盤を安定運用するための強力な「仕組み」そのものです。
もし、DLTの品質管理にピンと来ていない方がいらっしゃれば、ぜひこのブログを参考に一度パイプラインを動かしてみてください。きっと、ご自身のデータパイプラインが、より堅牢で信頼性の高い「仕組み」に変わるイメージが湧くはずです!
付録:ハンズオン!サンプルコードを動かしてみよう
ステップ1:ノートブックを準備する
- Databricksワークスペースで
[+] 作成
>[ノートブック]
を選択します。 - ノートブックに名前(例:
dlt_quality_pipeline
)を付け、デフォルト言語がPython
であることを確認します。 - この時点ではクラスターにアタッチする必要はありません。
ステップ2:コードを貼り付ける
- 作成したノートブックの最初のセルに、上記「3.2」のサンプルコードをすべて貼り付けます。
ステップ3:DLTパイプラインを作成・設定する
- 左のナビゲーションバーで
[ジョブとパイプライン]
>[作成]
>[ETLパイプライン]
を選択します。 - 以下の項目を設定します。
-
パイプライン名: 管理しやすい名前(例:
データ品質デモ
)を入力します。-
パイプラインモード:
トリガー
を選択します(手動実行モード)。
-
パイプラインモード:
-
ソースコード:
[パスを選択]
をクリックし、ステップ1で作成したノートブックを選択します。 -
配信先: Unity Catalogを選択。
- カタログ:テーブル書き出しをしたいカタログを選択
- スキーマ:テーブル書き出しをしたいスキーマを選択(例:
dlt_demo_db
)
-
コンピュート:
クラスターポリシー
を選択します。
-
パイプライン名: 管理しやすい名前(例:
ステップ4:パイプラインを実行する
- 設定画面右上の
[開始]
ボタンをクリックします。 - DLTがクラスターを起動し、パイプラインの実行を開始します(数分かかります)。
ステップ5:結果を確認する
- 実行完了後、画面に表示されるデータフローグラフやデータ品質メトリクスを確認します。
- SQLエディタや新しいノートブックから、ステップ3で設定したターゲットスキーマ内のテーブル(
clean_customers
やcustomer_summary
)をクエリし、データが意図通りに処理されていることを確認します。SELECT * FROM dlt_demo_db.customer_summary