3
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Lakeflow Spark宣言型パイプライン(SDP)チュートリアルのウォークスルー

Posted at

Lakeflow Spark宣言型パイプライン(SDP)とは

Databricksでデータパイプラインを構築する際、皆さんはどのようなツールを使っていますか?

2024年末から2025年にかけて、Databricksは従来の「Delta Live Tables(DLT)」を 「Lakeflow Spark宣言型パイプライン(Spark Declarative Pipelines、略称SDP)」 としてリブランディングしました。名前は変わりましたが、基本的な思想や機能はDLTを継承しており、よりDatabricksの統合データプラットフォーム戦略である Lakeflow の一部として位置づけられるようになりました。

SDPとは何か

Lakeflow Spark宣言型パイプライン(SDP) は、ETL(Extract、Transform、Load)パイプラインを宣言的に定義できるフレームワークです。

従来の命令型のアプローチでは「どうやってデータを処理するか」を詳細に記述する必要がありましたが、SDPでは「何が欲しいか」を定義するだけで、実行エンジンが最適な処理方法を自動的に決定してくれます。

SDPの主な特徴

特徴 説明
宣言型アプローチ SQLやPythonでテーブルの定義を書くだけ。実行順序や依存関係は自動的に解決される
データ品質管理 Expectations(期待値)を使って、データの品質ルールを定義し、違反を検出・処理できる
自動スケーリング サーバーレスコンピュートにより、ワークロードに応じた自動スケーリング
増分処理 ストリーミングテーブルにより、新しいデータのみを効率的に処理
CDC対応 AUTO CDCにより、Change Data Captureの複雑な処理を簡潔に記述可能
SCD Type 1/2対応 Slowly Changing Dimensionの履歴管理を組み込みでサポート

なぜ「宣言型」が重要なのか

従来の命令型パイプラインでは、以下のような課題がありました。

  • 依存関係の管理が複雑: どのテーブルがどのテーブルに依存しているか手動で管理
  • エラーハンドリングの煩雑さ: 失敗時のリトライや部分的な再実行のロジックを自分で書く必要
  • 増分処理の実装が大変: 「前回どこまで処理したか」を自前で追跡

SDPでは、これらを宣言的に定義することで、フレームワーク側が自動的に解決してくれます。

# SDPの宣言的なコード例
from pyspark import pipelines as dp

# これだけでストリーミングテーブルが定義できる
dp.create_streaming_table(
  name = "customers_cdc_clean",
  expect_all_or_drop = {
    "valid_id": "id IS NOT NULL",
    "valid_operation": "operation IN ('APPEND', 'DELETE', 'UPDATE')"
  }
)

DLTからSDPへ - 何が変わったのか

機能面では大きな変更はありませんが、以下の点が更新されています。

  • ブランディング: Lakeflowファミリーの一員として再定位
  • UIの改善: ETLパイプライン作成のワークフローが統合・簡素化
  • Python API: pyspark.pipelinesモジュールが利用可能に(dltモジュールも引き続き使用可能)
  • サーバーレス対応の強化: より多くのリージョンでサーバーレス実行が可能に

既存のDLTパイプラインは引き続き動作しますので、急いで移行する必要はありません。

この記事で学べること

本記事では、公式チュートリアル「チェンジデータキャプチャを使用してETLパイプラインを構築する」をウォークスルーしながら、SDPの以下の機能を実践的に学びます。

  1. Auto Loaderによる増分データ取り込み - クラウドストレージからのデータ取り込みを自動化
  2. エクスペクテーションによるデータ品質管理 - 不正データの検出と除外
  3. AUTO CDCによる変更データ処理 - INSERT/UPDATE/DELETEを自動的にマテリアライズ
  4. SCD Type 2による履歴管理 - 変更履歴を追跡するテーブルの構築
  5. ジョブスケジューリング - パイプラインの定期実行設定

メダリオンアーキテクチャ(Bronze → Silver → Gold)に沿ったEnd-to-Endのパイプラインを構築していきますので、SDPの全体像を把握したい方はぜひ最後までお付き合いください。

それでは、Databricks Free Editionを使用して実際にチュートリアルを進めていきましょう。

ステップ1: パイプラインの作成

サイドメニューの +新規 > ETLパイプライン を選択します。
Screenshot 2025-12-18 at 15.52.33.png

パイプライン作成画面が開きます。上のボックスにパイプライン名Pipelines with CDC tutorialを入力します。
Screenshot 2025-12-18 at 15.53.11.png

必要に応じてカタログとスキーマを変更します。パイプラインが実行されることで生成されるテーブルなどがこちらに格納されます。今回はFree Editionのデフォルトworkspace.defaultのままとします。

空のファイルで開始を選択し、パイプラインを構成するコードファイルを格納するフォルダを選択します。
Screenshot 2025-12-18 at 15.54.28.png
Screenshot 2025-12-18 at 15.54.06.png

Lakeflowパイプラインエディターが表示されます。
Screenshot 2025-12-18 at 15.55.27.png

ステップ2: サンプルデータの作成

新たにコードを追加してサンプルデータ生成ロジックを記述します。エディタ左のパイプライン構成ペインで追加 > 探索をクリックします。探索ノートブックはパイプラインの実行に含まれません。
Screenshot 2025-12-18 at 15.56.11.png

適当な名前をつけて、言語はPythonを選択します。
Screenshot 2025-12-18 at 15.56.32.png

ノートブックが作成されます。
Screenshot 2025-12-18 at 15.56.48.png

以下のコードを記述します。

%pip install faker
# パイプラインのステップ1で使用したカタログとスキーマに合わせて更新してください
catalog = "workspace"
schema = dbName = db = "default"

spark.sql(f'USE CATALOG `{catalog}`')
spark.sql(f'USE SCHEMA `{schema}`')
spark.sql(f'CREATE VOLUME IF NOT EXISTS `{catalog}`.`{db}`.`raw_data`')
volume_folder =  f"/Volumes/{catalog}/{db}/raw_data"

try:
  dbutils.fs.ls(volume_folder+"/customers")
except:
  print(f"フォルダが存在しないため、{volume_folder} にデータを生成します...")
  from pyspark.sql import functions as F
  from faker import Faker
  from collections import OrderedDict
  import uuid
  fake = Faker()
  import random

  # ダミーデータ生成用UDF
  fake_firstname = F.udf(fake.first_name)
  fake_lastname = F.udf(fake.last_name)
  fake_email = F.udf(fake.ascii_company_email)
  fake_date = F.udf(lambda:fake.date_time_this_month().strftime("%m-%d-%Y %H:%M:%S"))
  fake_address = F.udf(fake.address)
  operations = OrderedDict([("APPEND", 0.5),("DELETE", 0.1),("UPDATE", 0.3),(None, 0.01)])
  fake_operation = F.udf(lambda:fake.random_elements(elements=operations, length=1)[0])
  fake_id = F.udf(lambda: str(uuid.uuid4()) if random.uniform(0, 1) < 0.98 else None)

  # 10万件のダミーデータを生成
  df = spark.range(0, 100000).repartition(100)
  df = df.withColumn("id", fake_id())
  df = df.withColumn("firstname", fake_firstname())
  df = df.withColumn("lastname", fake_lastname())
  df = df.withColumn("email", fake_email())
  df = df.withColumn("address", fake_address())
  df = df.withColumn("operation", fake_operation())
  df_customers = df.withColumn("operation_date", fake_date())
  # JSON形式で保存
  df_customers.repartition(100).write.format("json").mode("overwrite").save(volume_folder+"/customers")

途中、アシスタントを使って翻訳しています。
Screenshot 2025-12-18 at 15.58.43.png

ボリュームにファイルが生成されます。
Screenshot 2025-12-18 at 17.03.01.png

ノートブックから中身も確認できます。
Screenshot 2025-12-18 at 16.00.46.png

ステップ3: Auto Loaderを用いた増分データ取り込み

今度はパイプラインのロジックなので、最初に作成されたmy_transformation.pyにコードを記載します。なお、パイプラインロジックはノートブックではなく.pyファイルに記載することが推奨となっています。

from pyspark import pipelines as dp
from pyspark.sql.functions import *

# 使用しているカタログとスキーマ名に置き換えてください:
path = "/Volumes/workspace/default/raw_data/customers"

# ブロンズテーブル(顧客データの着信ゾーンからの増分取り込み用)を作成
dp.create_streaming_table("customers_cdc_bronze", comment="クラウドオブジェクトストレージのランディングゾーンから増分取り込まれる新規顧客データ")

# Append Flowを作成し、生データをブロンズテーブルに取り込む
@dp.append_flow(
  target = "customers_cdc_bronze",
  name = "customers_bronze_ingest_flow"
)
def customers_bronze_ingest_flow():
  return (
      spark.readStream
          .format("cloudFiles")
          .option("cloudFiles.format", "json")
          .option("cloudFiles.inferColumnTypes", "true")
          .load(f"{path}")
  )

Screenshot 2025-12-18 at 16.02.51.png

ここで一度パイプラインを実行します。右上のパイプラインを実行をクリックします。コードからパイプラインが構成されて処理が実行されます。
output.gif

処理が完了すると、処理結果、エラーや警告の有無、処理レコード数などを確認できます。
Screenshot 2025-12-18 at 16.04.36.png

ステップ4: クリーンアップとデータ品質を追跡するためのエクスペクテーション

処理対象データに対するエクスペクテーション(期待)を定義することでデータ品質を担保することができます。新たな変換ファイルを追加して以下のロジックを記述します。expect_all_or_dropで記載している内容がエクスペクテーションです。

from pyspark import pipelines as dp
from pyspark.sql.functions import *

dp.create_streaming_table(
  name = "customers_cdc_clean",
  expect_all_or_drop = {"no_rescued_data": "_rescued_data IS NULL","valid_id": "id IS NOT NULL","valid_operation": "operation IN ('APPEND', 'DELETE', 'UPDATE')"}
  )

@dp.append_flow(
  target = "customers_cdc_clean",
  name = "customers_cdc_clean_flow"
)
def customers_cdc_clean_flow():
  return (
      spark.readStream.table("customers_cdc_bronze")
          .select("address", "email", "id", "firstname", "lastname", "operation", "operation_date", "_rescued_data")
  )

Screenshot 2025-12-18 at 16.05.57.png

パイプラインを実行するとエクペクテーションに合格したのかどうかが確認できます。
Screenshot 2025-12-18 at 16.06.30.png
Screenshot 2025-12-18 at 17.16.18.png

ステップ5: AUTO CDCフローを使用して顧客テーブルをマテリアライズする

ここで、CDCフローを用いて変更の結果をマテリアライズ(具現化)します。ここでのマテリアライズはマテリアライズドビューを作るという意味ではなく変更履歴をまとめ上げてテーブルとして具現化することを指しています。

from pyspark import pipelines as dp
from pyspark.sql.functions import *

dp.create_streaming_table(name="customers", comment="クリーンでマテリアライズされた顧客テーブル")

dp.create_auto_cdc_flow(
  target="customers",  # 顧客テーブル(マテリアライズ対象)
  source="customers_cdc_clean",  # 入力されるCDCデータ
  keys=["id"],  # 行のアップサートに使用するキー
  sequence_by=col("operation_date"),  # operation_dateで重複排除し、最新値を取得
  ignore_null_updates=False,
  apply_as_deletes=expr("operation = 'DELETE'"),  # DELETE条件
  except_column_list=["operation", "operation_date", "_rescued_data"],  # 除外するカラム
)

Screenshot 2025-12-18 at 16.09.05.png

Screenshot 2025-12-18 at 16.09.05.png

変更履歴から構成されるcustomersテーブルが作成されます。
Screenshot 2025-12-18 at 17.20.39.png

ステップ6: SCDタイプ2による更新履歴の追跡

SCDタイプ2とは、データの変更履歴を全て保持するテーブルを指します。

from pyspark import pipelines as dp
from pyspark.sql.functions import *

# テーブルを作成(顧客のSCD2履歴用)
dp.create_streaming_table(
    name="customers_history", comment="顧客のスローリー・チェンジング・ディメンションタイプ2"
)

# すべての変更をSCD2として保存
dp.create_auto_cdc_flow(
    target="customers_history",
    source="customers_cdc_clean",
    keys=["id"],
    sequence_by=col("operation_date"),
    ignore_null_updates=False,
    apply_as_deletes=expr("operation = 'DELETE'"),
    except_column_list=["operation", "operation_date", "_rescued_data"],
    stored_as_scd_type="2",
)  # SCD2を有効化し、個別の更新を保存

パイプラインを実行すると、データレコードがいつまで有効だったのかの日時が含まれる履歴テーブルが生成されます。
Screenshot 2025-12-18 at 16.11.31.png
Screenshot 2025-12-18 at 16.11.48.png

ステップ7: 更新頻度の多い顧客を追跡するマテリアライズドビューの作成

最後に更新頻度を集計した結果を格納するマテリアライズドビューを作成します。

from pyspark import pipelines as dp
from pyspark.sql.functions import *

@dp.table(
  name = "customers_history_agg",
  comment = "顧客履歴の集計"
)
def customers_history_agg():
  return (
    spark.read.table("customers_history")
      .groupBy("id")
      .agg(
          count("address").alias("address_count"),
          count("email").alias("email_count"),
          count("firstname").alias("firstname_count"),
          count("lastname").alias("lastname_count")
      )
  )

Screenshot 2025-12-18 at 16.13.19.png

これで一連の処理を行うデータパイプラインを構成することができました。しかし、これでは毎回手動実行する必要があります。スケジュール実行されるようにジョブを作成しましょう。

ステップ8: ETLパイプラインを実行するジョブの作成

パイプライン画面右上のスケジュールをクリックすると、このパイプラインの実行スケジュールを設定することができます。

Screenshot 2025-12-18 at 16.13.43.png

シンプルな周期指定からより詳細な時刻設定が可能です。さらには、ストレージにファイルが到着したことをトリガーにすることもできます。

Screenshot 2025-12-18 at 16.14.12.png

これで、データパイプラインの作成からジョブ作成までをカバーできました。パイプライン実行の結果生成されるテーブルはカタログエクスプローラからも確認できます。

Screenshot 2025-12-18 at 16.15.25.png

Free Editionでも試せますので、ぜひトライしてみてください!

はじめてのDatabricks

はじめてのDatabricks

Databricks無料トライアル

Databricks無料トライアル

3
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?