Lakeflow Spark宣言型パイプライン(SDP)とは
Databricksでデータパイプラインを構築する際、皆さんはどのようなツールを使っていますか?
2024年末から2025年にかけて、Databricksは従来の「Delta Live Tables(DLT)」を 「Lakeflow Spark宣言型パイプライン(Spark Declarative Pipelines、略称SDP)」 としてリブランディングしました。名前は変わりましたが、基本的な思想や機能はDLTを継承しており、よりDatabricksの統合データプラットフォーム戦略である Lakeflow の一部として位置づけられるようになりました。
SDPとは何か
Lakeflow Spark宣言型パイプライン(SDP) は、ETL(Extract、Transform、Load)パイプラインを宣言的に定義できるフレームワークです。
従来の命令型のアプローチでは「どうやってデータを処理するか」を詳細に記述する必要がありましたが、SDPでは「何が欲しいか」を定義するだけで、実行エンジンが最適な処理方法を自動的に決定してくれます。
SDPの主な特徴
| 特徴 | 説明 |
|---|---|
| 宣言型アプローチ | SQLやPythonでテーブルの定義を書くだけ。実行順序や依存関係は自動的に解決される |
| データ品質管理 | Expectations(期待値)を使って、データの品質ルールを定義し、違反を検出・処理できる |
| 自動スケーリング | サーバーレスコンピュートにより、ワークロードに応じた自動スケーリング |
| 増分処理 | ストリーミングテーブルにより、新しいデータのみを効率的に処理 |
| CDC対応 |
AUTO CDCにより、Change Data Captureの複雑な処理を簡潔に記述可能 |
| SCD Type 1/2対応 | Slowly Changing Dimensionの履歴管理を組み込みでサポート |
なぜ「宣言型」が重要なのか
従来の命令型パイプラインでは、以下のような課題がありました。
- 依存関係の管理が複雑: どのテーブルがどのテーブルに依存しているか手動で管理
- エラーハンドリングの煩雑さ: 失敗時のリトライや部分的な再実行のロジックを自分で書く必要
- 増分処理の実装が大変: 「前回どこまで処理したか」を自前で追跡
SDPでは、これらを宣言的に定義することで、フレームワーク側が自動的に解決してくれます。
# SDPの宣言的なコード例
from pyspark import pipelines as dp
# これだけでストリーミングテーブルが定義できる
dp.create_streaming_table(
name = "customers_cdc_clean",
expect_all_or_drop = {
"valid_id": "id IS NOT NULL",
"valid_operation": "operation IN ('APPEND', 'DELETE', 'UPDATE')"
}
)
DLTからSDPへ - 何が変わったのか
機能面では大きな変更はありませんが、以下の点が更新されています。
- ブランディング: Lakeflowファミリーの一員として再定位
- UIの改善: ETLパイプライン作成のワークフローが統合・簡素化
-
Python API:
pyspark.pipelinesモジュールが利用可能に(dltモジュールも引き続き使用可能) - サーバーレス対応の強化: より多くのリージョンでサーバーレス実行が可能に
既存のDLTパイプラインは引き続き動作しますので、急いで移行する必要はありません。
この記事で学べること
本記事では、公式チュートリアル「チェンジデータキャプチャを使用してETLパイプラインを構築する」をウォークスルーしながら、SDPの以下の機能を実践的に学びます。
- Auto Loaderによる増分データ取り込み - クラウドストレージからのデータ取り込みを自動化
- エクスペクテーションによるデータ品質管理 - 不正データの検出と除外
- AUTO CDCによる変更データ処理 - INSERT/UPDATE/DELETEを自動的にマテリアライズ
- SCD Type 2による履歴管理 - 変更履歴を追跡するテーブルの構築
- ジョブスケジューリング - パイプラインの定期実行設定
メダリオンアーキテクチャ(Bronze → Silver → Gold)に沿ったEnd-to-Endのパイプラインを構築していきますので、SDPの全体像を把握したい方はぜひ最後までお付き合いください。
それでは、Databricks Free Editionを使用して実際にチュートリアルを進めていきましょう。
ステップ1: パイプラインの作成
サイドメニューの +新規 > ETLパイプライン を選択します。

パイプライン作成画面が開きます。上のボックスにパイプライン名Pipelines with CDC tutorialを入力します。

必要に応じてカタログとスキーマを変更します。パイプラインが実行されることで生成されるテーブルなどがこちらに格納されます。今回はFree Editionのデフォルトworkspace.defaultのままとします。
空のファイルで開始を選択し、パイプラインを構成するコードファイルを格納するフォルダを選択します。


Lakeflowパイプラインエディターが表示されます。

ステップ2: サンプルデータの作成
新たにコードを追加してサンプルデータ生成ロジックを記述します。エディタ左のパイプライン構成ペインで追加 > 探索をクリックします。探索ノートブックはパイプラインの実行に含まれません。

以下のコードを記述します。
%pip install faker
# パイプラインのステップ1で使用したカタログとスキーマに合わせて更新してください
catalog = "workspace"
schema = dbName = db = "default"
spark.sql(f'USE CATALOG `{catalog}`')
spark.sql(f'USE SCHEMA `{schema}`')
spark.sql(f'CREATE VOLUME IF NOT EXISTS `{catalog}`.`{db}`.`raw_data`')
volume_folder = f"/Volumes/{catalog}/{db}/raw_data"
try:
dbutils.fs.ls(volume_folder+"/customers")
except:
print(f"フォルダが存在しないため、{volume_folder} にデータを生成します...")
from pyspark.sql import functions as F
from faker import Faker
from collections import OrderedDict
import uuid
fake = Faker()
import random
# ダミーデータ生成用UDF
fake_firstname = F.udf(fake.first_name)
fake_lastname = F.udf(fake.last_name)
fake_email = F.udf(fake.ascii_company_email)
fake_date = F.udf(lambda:fake.date_time_this_month().strftime("%m-%d-%Y %H:%M:%S"))
fake_address = F.udf(fake.address)
operations = OrderedDict([("APPEND", 0.5),("DELETE", 0.1),("UPDATE", 0.3),(None, 0.01)])
fake_operation = F.udf(lambda:fake.random_elements(elements=operations, length=1)[0])
fake_id = F.udf(lambda: str(uuid.uuid4()) if random.uniform(0, 1) < 0.98 else None)
# 10万件のダミーデータを生成
df = spark.range(0, 100000).repartition(100)
df = df.withColumn("id", fake_id())
df = df.withColumn("firstname", fake_firstname())
df = df.withColumn("lastname", fake_lastname())
df = df.withColumn("email", fake_email())
df = df.withColumn("address", fake_address())
df = df.withColumn("operation", fake_operation())
df_customers = df.withColumn("operation_date", fake_date())
# JSON形式で保存
df_customers.repartition(100).write.format("json").mode("overwrite").save(volume_folder+"/customers")
ステップ3: Auto Loaderを用いた増分データ取り込み
今度はパイプラインのロジックなので、最初に作成されたmy_transformation.pyにコードを記載します。なお、パイプラインロジックはノートブックではなく.pyファイルに記載することが推奨となっています。
from pyspark import pipelines as dp
from pyspark.sql.functions import *
# 使用しているカタログとスキーマ名に置き換えてください:
path = "/Volumes/workspace/default/raw_data/customers"
# ブロンズテーブル(顧客データの着信ゾーンからの増分取り込み用)を作成
dp.create_streaming_table("customers_cdc_bronze", comment="クラウドオブジェクトストレージのランディングゾーンから増分取り込まれる新規顧客データ")
# Append Flowを作成し、生データをブロンズテーブルに取り込む
@dp.append_flow(
target = "customers_cdc_bronze",
name = "customers_bronze_ingest_flow"
)
def customers_bronze_ingest_flow():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.inferColumnTypes", "true")
.load(f"{path}")
)
ここで一度パイプラインを実行します。右上のパイプラインを実行をクリックします。コードからパイプラインが構成されて処理が実行されます。

処理が完了すると、処理結果、エラーや警告の有無、処理レコード数などを確認できます。

ステップ4: クリーンアップとデータ品質を追跡するためのエクスペクテーション
処理対象データに対するエクスペクテーション(期待)を定義することでデータ品質を担保することができます。新たな変換ファイルを追加して以下のロジックを記述します。expect_all_or_dropで記載している内容がエクスペクテーションです。
from pyspark import pipelines as dp
from pyspark.sql.functions import *
dp.create_streaming_table(
name = "customers_cdc_clean",
expect_all_or_drop = {"no_rescued_data": "_rescued_data IS NULL","valid_id": "id IS NOT NULL","valid_operation": "operation IN ('APPEND', 'DELETE', 'UPDATE')"}
)
@dp.append_flow(
target = "customers_cdc_clean",
name = "customers_cdc_clean_flow"
)
def customers_cdc_clean_flow():
return (
spark.readStream.table("customers_cdc_bronze")
.select("address", "email", "id", "firstname", "lastname", "operation", "operation_date", "_rescued_data")
)
パイプラインを実行するとエクペクテーションに合格したのかどうかが確認できます。


ステップ5: AUTO CDCフローを使用して顧客テーブルをマテリアライズする
ここで、CDCフローを用いて変更の結果をマテリアライズ(具現化)します。ここでのマテリアライズはマテリアライズドビューを作るという意味ではなく変更履歴をまとめ上げてテーブルとして具現化することを指しています。
from pyspark import pipelines as dp
from pyspark.sql.functions import *
dp.create_streaming_table(name="customers", comment="クリーンでマテリアライズされた顧客テーブル")
dp.create_auto_cdc_flow(
target="customers", # 顧客テーブル(マテリアライズ対象)
source="customers_cdc_clean", # 入力されるCDCデータ
keys=["id"], # 行のアップサートに使用するキー
sequence_by=col("operation_date"), # operation_dateで重複排除し、最新値を取得
ignore_null_updates=False,
apply_as_deletes=expr("operation = 'DELETE'"), # DELETE条件
except_column_list=["operation", "operation_date", "_rescued_data"], # 除外するカラム
)
変更履歴から構成されるcustomersテーブルが作成されます。

ステップ6: SCDタイプ2による更新履歴の追跡
SCDタイプ2とは、データの変更履歴を全て保持するテーブルを指します。
from pyspark import pipelines as dp
from pyspark.sql.functions import *
# テーブルを作成(顧客のSCD2履歴用)
dp.create_streaming_table(
name="customers_history", comment="顧客のスローリー・チェンジング・ディメンションタイプ2"
)
# すべての変更をSCD2として保存
dp.create_auto_cdc_flow(
target="customers_history",
source="customers_cdc_clean",
keys=["id"],
sequence_by=col("operation_date"),
ignore_null_updates=False,
apply_as_deletes=expr("operation = 'DELETE'"),
except_column_list=["operation", "operation_date", "_rescued_data"],
stored_as_scd_type="2",
) # SCD2を有効化し、個別の更新を保存
パイプラインを実行すると、データレコードがいつまで有効だったのかの日時が含まれる履歴テーブルが生成されます。


ステップ7: 更新頻度の多い顧客を追跡するマテリアライズドビューの作成
最後に更新頻度を集計した結果を格納するマテリアライズドビューを作成します。
from pyspark import pipelines as dp
from pyspark.sql.functions import *
@dp.table(
name = "customers_history_agg",
comment = "顧客履歴の集計"
)
def customers_history_agg():
return (
spark.read.table("customers_history")
.groupBy("id")
.agg(
count("address").alias("address_count"),
count("email").alias("email_count"),
count("firstname").alias("firstname_count"),
count("lastname").alias("lastname_count")
)
)
これで一連の処理を行うデータパイプラインを構成することができました。しかし、これでは毎回手動実行する必要があります。スケジュール実行されるようにジョブを作成しましょう。
ステップ8: ETLパイプラインを実行するジョブの作成
パイプライン画面右上のスケジュールをクリックすると、このパイプラインの実行スケジュールを設定することができます。
シンプルな周期指定からより詳細な時刻設定が可能です。さらには、ストレージにファイルが到着したことをトリガーにすることもできます。
これで、データパイプラインの作成からジョブ作成までをカバーできました。パイプライン実行の結果生成されるテーブルはカタログエクスプローラからも確認できます。
Free Editionでも試せますので、ぜひトライしてみてください!












