こちらのもくもく会でいただいた質問に対する回答です。
はじめに
Lakeflow SDP (Spark Declarative Pipelines) について、よくある質問をQ&A形式でまとめました。Lakeflow SDPはDatabricksのETLパイプライン構築フレームワークで、以前はDelta Live Tables (DLT) として知られていたものの進化版です。
本記事では、SQLとPythonの使い分け、監査列の作成方法、作成可能なオブジェクトの種類、エクスペクテーションの通知について解説します。
Q1: SQLとPythonの使い分けについて
質問
Lakeflow SDPではSQLにおける複雑な処理は難しいですか? エクスペクテーションなどはSQL/Python両方でできると思いますが、簡単な処理であればSQL、複雑な処理をさせる場合はPythonなどといった使い分けはありますか?
回答
はい、SQLで簡単な処理、Pythonで複雑な処理という使い分けは実際に推奨されるアプローチです。SDPでは「簡潔な宣言的ETLはSQL、制御や外部ライブラリ・動的生成が必要ならPython」という使い分けが一般的です。同一パイプライン内でSQLファイルとPythonファイルを混在させることが公式にサポートされています。
エクスペクテーションにおける機能差
エクスペクテーション自体はSQL/Python両方で利用可能ですが、機能差があります。
SQLの場合:
CREATE OR REFRESH STREAMING TABLE orders_valid(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL) ON VIOLATION DROP ROW
)
Pythonの場合 (追加機能あり):
valid_rules = {
"valid_count": "count > 0",
"valid_page": "page_id IS NOT NULL"
}
@dp.table
@dp.expect_all_or_drop(valid_rules) # 複数ルールを一括適用
def cleaned_data():
...
重要な差: SQLとPythonの両方が単一データセットに複数のエクスペクテーションをサポートしていますが、複数のエクスペクテーションをグループ化して一括アクションを指定できるのはPythonのみです。
Pythonが有利な複雑処理の例
1. 再帰的な階層データ処理
再帰的CTE (Common Table Expression) はDatabricks SQLでは利用可能ですが、Lakeflow Declarative Pipelinesでは現時点で利用できません。そのためSQLは複雑で直感的ではなく、特にPythonと比較してこのユースケースには柔軟性に欠けます。
2. ロジックの再利用性・テスト容易性
Pythonではすべてのロジックを関数に隠蔽でき、コードをクリアでシンプルに見せることができます。ロジックを分離して実装することで、関数のユニットテストが可能になり、他のパイプライン内で再利用もできます。
3. 動的なルール管理
from pyspark import pipelines as dp
from pyspark.sql.functions import col
def get_rules(tag):
"""ルールテーブルからタグに一致するルールを取得"""
df = spark.read.table("rules").filter(col("tag") == tag).collect()
return {row['name']: row['constraint'] for row in df}
@dp.table
@dp.expect_all_or_drop(get_rules('validity'))
def raw_farmers_market():
return (
spark.read.format('csv')
.option("header", "true")
.load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
)
推奨される使い分け
| 処理タイプ | 推奨言語 | 理由 |
|---|---|---|
| シンプルなSELECT/JOIN/集計 | SQL | 宣言的で可読性が高い |
| 基本的なエクスペクテーション | SQL | 十分な機能がある |
| 階層データ処理 | Python | 再帰CTEが未サポート |
| 動的ルール適用 | Python | テーブルからルールを動的読込可能 |
| 複雑なビジネスロジック | Python | 関数分離・テスト・再利用が容易 |
| メタプログラミング | Python | SQLでは困難 |
Databricksは、シンプルな変換にはSQLスクリプトを使い、より複雑なロジックが必要なものにはPythonを使うハイブリッドアプローチが有効としています。
実務上は、チームのスキルセットやメンテナンス性も考慮して、基本はSQLで書きつつ、SQLでは書きにくい部分だけPythonに切り出すという方針が現実的です。
1つのノートブックやファイルは「SQLのみ」または「Pythonのみ」です(同一ノートブック内の混在不可)。ただし、パイプラインは複数ノートブックで構成できるため、取り込みはPython、整形・公開はSQL…など言語を分けて併用できます。
実務での典型パターン
- 取り込み・複雑な前処理(正規化、パース、外部ライブラリ活用、動的DAG生成など)はPython。
- モデリング・集計・公開(マテビュー/ストリーミングテーブル)や多くのエクスペクテーションはSQLで宣言的に。
- エクスペクテーションは「基本SQL、複数エクスペクテーションの一括制御が必要な箇所のみPython」というハイブリッドが扱いやすいです。
ポイント: まずはSQLで書けるところまで書き、宣言的に表現しにくくなった箇所や外部ライブラリが必要な箇所だけPythonに切り出すのが運用・保守の観点でもおすすめです。
Q2: ブロンズテーブルの監査列について
質問
ブロンズテーブルを作成するときに、監査列を作りたい場合は、従来通りメタデータ (ファイルパスや取り込み時刻) から抽出することも可能ですか? それとも、いい感じにやってくれるのでしょうか?
回答
結論から言うと、従来通り _metadata カラムを使って手動で監査列を作成する必要があります。SDPには自動的に付与してくれる機能はありません。
_metadata カラムで取得できる情報
_metadataはSTRUCT型で、以下の情報を含みます:
| フィールド | 説明 |
|---|---|
file_path |
ファイルの完全パス |
file_name |
ファイル名 |
file_size |
ファイルサイズ |
file_block_start |
ブロック開始位置 |
file_block_length |
ブロック長 |
file_modification_time |
ファイル更新日時 |
Lakeflow SDP での実装例
Python:
from pyspark.sql.functions import col, current_timestamp
df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("s3://my-bucket/raw/orders/")
.selectExpr("*", "_metadata") # ← メタデータを明示的に含める
.withColumn("source_file_path", col("_metadata.file_path"))
.withColumn("source_file_name", col("_metadata.file_name"))
.withColumn("source_file_mod_time", col("_metadata.file_modification_time"))
.withColumn("ingested_at", current_timestamp()))
# 以降は dlt/dp.table などの宣言で bronze を作成
SQL (read_files使用):
CREATE OR REFRESH STREAMING TABLE bronze_orders AS
SELECT
o.*,
_metadata.file_path AS source_file_path,
_metadata.file_name AS source_file_name,
_metadata.file_modification_time AS source_file_mod_time,
current_timestamp() AS ingested_at
FROM read_files(
's3://my-bucket/raw/orders/',
format => 'csv'
);
"いい感じにやってくれる" 部分
Lakeflow SDPが自動管理してくれるのは監査列ではなく、以下の部分です:
| 自動管理される項目 | 説明 |
|---|---|
| スキーマ管理 | スキーマ推論・進化を自動処理 |
| チェックポイント | どのファイルを処理済みか追跡 |
| exactly-once保証 | 重複取り込み防止 |
| リトライ | 失敗時の自動リカバリ |
つまり、何を取り込んだかのメタデータ (監査列) は明示的に指定する必要がある一方で、どこまで取り込んだかの状態管理は自動という設計です。
推奨パターン
ブロンズテーブルの監査列として一般的に追加するのは:
.select(
"*",
# ソースファイル情報
col("_metadata.file_path").alias("_source_file"),
col("_metadata.file_modification_time").alias("_file_modified_at"),
# パイプライン処理情報
current_timestamp().alias("_ingested_at"),
# オプション: レスキューされた不正データ
"_rescued_data"
)
監査列の定義は手動ですが、一度書けばパイプラインが自動で差分処理してくれるので、運用負荷は従来のSparkジョブより大幅に軽減されます。
Q3: 作成できるオブジェクトの種類とメトリクスビュー
質問
SDPで作成できるテーブル・ビューは、MVとSTのみでしょうか? メトリクスビューを作成したいと考えているのですが、SDPで作れますか?
回答
結論を先にお伝えすると、SDPでメトリクスビューは作成できません。これは別の機能です。SDPで公開できるデータセットは主にストリーミングテーブル(ST) と マテリアライズビュー(MV) です。パイプライン内でのみ使う一時的なビューも定義できます(外部からは参照不可)。
「メトリクスビュー」は専用タイプがあるわけではないため、MV(必要に応じてST)として定義して公開するのが標準的なやり方です。
SDPで作成できるオブジェクト
パイプラインは、1つ以上のフロー、ストリーミングテーブル、マテリアライズビュー、シンクを含むことができます。
| オブジェクト | 説明 |
|---|---|
| ストリーミングテーブル(ST) | ストリーミングターゲット。Append/AUTO CDCフローを受け入れる |
| マテリアライズビュー(MV) | バッチターゲット。事前計算された結果を保持 |
| ビュー | パイプライン内でのみ参照可能な一時ビュー (カタログに公開されない) |
| シンク | 外部ターゲット (Kafka, Event Hubs, 外部Deltaテーブル等) |
メトリクスビューとは
メトリクスビューはSDPとは別のUnity Catalog機能です。
メトリクスビューは複雑なビジネスロジックを集中定義に抽象化し、組織がKPIを一度定義してダッシュボード、Genieスペース、アラートなどのレポートツール全体で一貫して使用できるようにします。メトリクスビューはYAML形式で定義され、Unity Catalogに登録されます。SQLまたはCatalog Explorer UIを使用して作成できます。
作成方法:
- Databricks SQLエディタで
CREATE VIEW ... WITH METRICS構文を使用 - Catalog Explorer UIから作成
- Databricks Assistantで生成
作成例:
CREATE VIEW orders_metric_view
WITH METRICS
$$
version: 1.1
source:
table: samples.tpch.orders
dimensions:
- name: Order Month
expr: DATE_TRUNC('MONTH', o_orderdate)
- name: Order Status
expr: o_orderstatus
measures:
- name: Order Count
expr: COUNT(*)
- name: Total Revenue
expr: SUM(o_totalprice)
- name: Total Revenue per Customer
expr: SUM(o_totalprice) / COUNT(DISTINCT o_custkey)
$$;
推奨アーキテクチャ
SDPとメトリクスビューを組み合わせる場合の構成:
[SDP パイプライン]
↓
Bronze (ST) → Silver (ST/MV) → Gold (MV)
↓
[Unity Catalog]
↓
[Metric View] ← Databricks SQL で作成
↓
[Dashboard / Genie / Alert]
補足
- SDPはパイプラインのイベントログ(行数、期待値違反件数などの実行メトリクス)をDeltaテーブルとして出力するため、これを集計してMVとして可視化できます。
- イベントログはパス
dbfs:/pipelines/<pipeline-id>/system/eventsから参照可能です(従来DLTの命名が一部残っています)。
例:イベントログからメトリクスMVを作成(SQL)
CREATE OR REFRESH MATERIALIZED VIEW pipeline_metrics AS
SELECT
details:flow_progress.metrics.num_output_rows AS num_output_rows,
details:flow_progress.metrics.num_dropped_rows AS num_dropped_rows,
details:flow_progress.metrics.num_failed_rows AS num_failed_rows,
timestamp AS update_ts
FROM delta.`dbfs:/pipelines/<pipeline-id>/system/events`
WHERE event_type = 'flow_progress';
つまり:
- SDPでデータパイプラインを構築し、Gold層のMVを作成
- そのMVをソースとしてメトリクスビューをDatabricks SQLで別途作成
メトリクスビューは「BI/分析向けのセマンティックレイヤー」として位置づけられており、ETL処理を担うSDPとは役割が異なります。SDPで作成したテーブル/ビューを参照する形でメトリクスビューを定義するのが想定された使い方です。
Q4: エクスペクテーション警告の通知
質問
エクスペクテーションの警告はジョブのエラー時のようにメールやslack等にアラートを飛ばせるんでしょうか?可能であれば手順もお聞きしたいです。
回答
可能です。エクスペクテーションの結果はSDPのパイプラインイベントログに記録されるため、そのログをDatabricks SQLでクエリしてアラート(メール)を設定できます。Slackなどへの通知は、軽量なジョブ(ノートブック/Pythonファイル)で同じ指標を判定してWebhooksにPOSTする形が実務的です。
仕組みの要点
- SDPはパイプライン実行時のメトリクス(行数、ドロップ/失敗件数、エクスペクテーションのメトリクス)をイベントログに出力します。ログはDeltaテーブルとして参照でき、レポートや通知の基盤になります。
- イベントログの既定パスは
dbfs:/pipelines/<pipeline-id>/system/eventsです。 - エクスペクテーションのアクションは「warn(書き込み継続)」「drop(違反行をドロップ)」「fail(更新失敗)」の3種です。通知はwarn/dropでも可能ですが、failにするとパイプライン更新自体が失敗します。
手順A(メール通知:Databricks SQL Alerts)
- クエリを作成
イベントログから違反件数(例:ドロップ/失敗行)を抽出・集計します。-- 例:フロー進捗イベントからドロップ/失敗行のメトリクスを取得 SELECT timestamp AS update_ts, details:flow_progress.metrics.num_output_rows AS num_output_rows, details:flow_progress.metrics.num_dropped_rows AS num_dropped_rows, details:flow_progress.metrics.num_failed_rows AS num_failed_rows FROM delta.`dbfs:/pipelines/<pipeline-id>/system/events` WHERE event_type = 'flow_progress'; - アラートを作成
Databricks SQLの「アラート」を開き、上記クエリを条件付きで監視(例:num_failed_rows > 0やnum_dropped_rows / num_output_rows > 0.01)。メール宛先と実行間隔を設定します。SDP(旧DLT)ログはメールアラートの対象として活用可能です。
手順B(Slack通知:イベントフック)
アプローチ: SDPイベントフック(Public Preview)で、パイプラインのイベントログ永続化時にPythonコールバックを実行し、Slackや外部メールAPIへ通知する。
特徴: パイプラインのイベントに対して直接フックでき、リアルタイムに通知できる。Pythonのみ対応。STABLEなイベントのみトリガ。Hooksは非同期実行だが同時並行せず逐次処理。シークレットでトークンを安全管理。
使いどころ: 「即時・パイプライン内で完結」させたいとき(例: 失敗行>0やドロップ率>閾値で即時にSlackへ)。
手順:
- DatabricksシークレットにSlackトークンやWebhook情報を格納(scope/key)。
- パイプラインのPythonソースに
@dp.on_event_hookを定義し、flow_progressイベントのメトリクス(例:num_failed_rows,num_dropped_rows,num_output_rows)を判定して送信。イベント別にフィルタ可能。 - 連続失敗上限
max_allowable_consecutive_failuresを必要に応じて設定(許容回数超過で自動的にフック無効化)。
実装例
from pyspark import pipelines as dp
import requests
API_TOKEN = dbutils.secrets.get(scope="<secret-scope>", key="<token-key>") # Secretsで取得
SLACK_URL = "https://slack.com/api/chat.postMessage"
CHANNEL_ID = "<channel-id>"
HEADERS = {"Content-Type": "application/json", "Authorization": f"Bearer {API_TOKEN}"}
@dp.on_event_hook # 必要なら max_allowable_consecutive_failures=3 など
def notify_expectations(event: dict):
if event.get("event_type") != "flow_progress":
return
metrics = event.get("details", {}).get("flow_progress", {}).get("metrics", {})
num_failed = metrics.get("num_failed_rows", 0)
num_dropped = metrics.get("num_dropped_rows", 0)
num_output = metrics.get("num_output_rows", 0)
# 例: failed>0 または dropped率>=1%で通知
breach = (num_failed and num_failed > 0) or (num_output and (num_dropped / max(num_output, 1) >= 0.01))
if breach:
text = f"SDP Expectations alert: failed={num_failed}, dropped={num_dropped}, output={num_output}"
requests.post(SLACK_URL, headers=HEADERS, json={"channel": CHANNEL_ID, "text": text})
まとめ
| 項目 | ポイント |
|---|---|
| SQL vs Python | 基本はSQL、複雑処理はPython。同一パイプラインで混在可能 |
| 監査列 |
_metadataカラムから手動で抽出が必要。自動付与はなし |
| 作成可能オブジェクト | ST, MV, ビュー, シンク。メトリクスビューは別機能 |
| エクスペクテーションの通知 | Databricks SQLのアラート、イベントフックで通知を構成可能 |
Lakeflow SDPは宣言的なETLパイプライン構築に特化しており、セマンティックレイヤー (メトリクスビュー) やBI機能とは明確に役割分担されています。それぞれの機能の特性を理解して、適切に組み合わせることで効率的なデータ基盤を構築できます。
参考リンク
- Lakeflow Spark宣言型パイプライン
- パイプラインエクスペクテーションによるデータ品質の管理
- ファイルメタデータ列
- Unity Catalogメトリクスビュー
- パイプラインのエクスペクテーションによるデータ品質の管理
- イベントフックを使用してパイプラインのカスタムモニタリングを定義する