0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Databricks LakeFlow Spark 宣言型パイプラインで実践するメタプログラミング 3 パターン

0
Last updated at Posted at 2026-05-05

はじめに

Databricks SDP(Spark Declarative Pipelines)にて複数の処理をパラメータにより実行する LakeFlow Spark 宣言型パイプラインのメタプログラミングについて紹介します。本記事では、LakeFlow Spark 宣言型パイプラインにおけるメタプログラミングを 3 つのパターンで実装方法を実績します。

  1. パターン 1: Python の辞書型リストで直接定義する方法
  2. パターン 2: データクラスを別 Python ファイルに定義してインポートする方法
  3. パターン 3: Databricks テーブルに JSON 形式の設定を保持し、読み取り後に辞書型へ変換する方法

なお、本検証は以下の Databricks 公式ドキュメントを参考にしています。

image.png

出所: チュートリアル:異なるパラメーターで複数のフローを作成する | Databricks on AWS

検証シナリオ

3 つのソーステーブル(table_01table_02table_03)から Change Data Feed を読み取り、それぞれ別のターゲットテーブルへ append_flow で書き込むパイプラインを構築します。3 パターンともパイプラインのロジック自体は同一で、設定の保持場所だけ を変えて挙動を比較します。

パターン 設定の保持場所 書き込み先スキーマ
1 パイプラインコード内の辞書型リスト cdf_demo_tgt_01
2 別 Python ファイル内のデータクラス cdf_demo_tgt_02
3 Delta テーブル内の JSON カラム cdf_demo_tgt_03

事前準備

ソースのカタログとスキーマを作成

検証用のカタログとスキーマを作成します。

CREATE CATALOG IF NOT EXISTS sdp_meta_01;
CREATE SCHEMA IF NOT EXISTS sdp_meta_01.cdf_demo_src;
USE CATALOG sdp_meta_01;
USE sdp_meta_01.cdf_demo_src;

image.png

ソースのテーブルを作成

CDF を有効化した Delta テーブルを 3 つ作成し、初期データを投入します。

DROP TABLE IF EXISTS table_01;

CREATE TABLE table_01 (
    customer_id BIGINT,
    customer_name STRING,
    status STRING,
    updated_at TIMESTAMP
  ) USING DELTA
  TBLPROPERTIES (delta.enableChangeDataFeed = true);

INSERT INTO table_01
  VALUES
    (1, 'Alice', 'active', TIMESTAMP '2026-05-05 10:00:00'),
    (2, 'Bob', 'active', TIMESTAMP '2026-05-05 10:00:00'),
    (3, 'Carol', 'active', TIMESTAMP '2026-05-05 10:00:00');

image.png

DROP TABLE IF EXISTS table_02;

CREATE TABLE table_02 (
    product_id BIGINT,
    product_name STRING,
    price DECIMAL(10, 2),
    updated_at TIMESTAMP
  ) USING DELTA
  TBLPROPERTIES (delta.enableChangeDataFeed = true);

INSERT INTO table_02
  VALUES
    (101, 'Keyboard', 12000.00, TIMESTAMP '2026-05-05 10:00:00'),
    (102, 'Mouse', 3000.00, TIMESTAMP '2026-05-05 10:00:00'),
    (103, 'Monitor', 35000.00, TIMESTAMP '2026-05-05 10:00:00');

image.png

DROP TABLE IF EXISTS table_03;

CREATE TABLE table_03 (
    product_id BIGINT,
    product_name STRING,
    price DECIMAL(10, 2),
    updated_at TIMESTAMP
  ) USING DELTA
  TBLPROPERTIES (delta.enableChangeDataFeed = true);

INSERT INTO table_03
  VALUES
    (101, 'Keyboard', 12000.00, TIMESTAMP '2026-05-05 10:00:00'),
    (102, 'Mouse', 3000.00, TIMESTAMP '2026-05-05 10:00:00'),
    (103, 'Monitor', 35000.00, TIMESTAMP '2026-05-05 10:00:00');

image.png

ターゲットのスキーマを作成

3 パターンそれぞれの書き込み先となるスキーマを作成します。

CREATE SCHEMA IF NOT EXISTS sdp_meta_01.cdf_demo_tgt_01;
CREATE SCHEMA IF NOT EXISTS sdp_meta_01.cdf_demo_tgt_02;
CREATE SCHEMA IF NOT EXISTS sdp_meta_01.cdf_demo_tgt_03;

image.png

パターン 1: Python の辞書型リストで直接定義する方法

最もシンプルなアプローチです。パイプラインコードの先頭に辞書型のリストとして設定を持ち、for ループでフロー定義を量産します。

ETL パイプラインを作成

image.png

my_transformation.py にコードを記述

from pyspark import pipelines as dp
from pyspark.sql import functions as F


CDF_TABLE_SPECS = [
    {
        "source_table": "sdp_meta_01.cdf_demo_src.table_01",
        "target_table": "sdp_meta_01.cdf_demo_tgt_01.table_01_cdf_events",
        "flow_name": "table_01_cdf_append_flow",
    },
    {
        "source_table": "sdp_meta_01.cdf_demo_src.table_02",
        "target_table": "sdp_meta_01.cdf_demo_tgt_01.table_02_cdf_events",
        "flow_name": "table_02_cdf_append_flow",
    },
    {
        "source_table": "sdp_meta_01.cdf_demo_src.table_03",
        "target_table": "sdp_meta_01.cdf_demo_tgt_01.table_03_cdf_events",
        "flow_name": "table_03_cdf_append_flow",
    },
]


def register_cdf_append_pipeline(spec: dict) -> None:
    source_table = spec["source_table"]
    target_table = spec["target_table"]
    flow_name = spec["flow_name"]

    dp.create_streaming_table(
        name=target_table,
        comment=f"CDF event table from {source_table}",
    )

    @dp.append_flow(
        target=target_table,
        name=flow_name,
        comment=f"Append CDF events from {source_table}",
    )
    def append_cdf_events(
        source_table=source_table,
    ):
        return (
            spark.readStream.option("readChangeFeed", "true")
            .table(source_table)
            .withColumn("audit__source_table", F.lit(source_table))
            .withColumn("audit__ingested_at", F.current_timestamp())
        )


for spec in CDF_TABLE_SPECS:
    register_cdf_append_pipeline(spec)

image.png

パイプラインを実行

3 つのストリーミングテーブルが、同一パイプライン内で並行して生成されることを確認します。

image.png

image.png

image.png

テーブルからデータを取得できることを確認

SELECT * FROM sdp_meta_01.cdf_demo_tgt_01.table_01_cdf_events;
SELECT * FROM sdp_meta_01.cdf_demo_tgt_01.table_02_cdf_events;
SELECT * FROM sdp_meta_01.cdf_demo_tgt_01.table_03_cdf_events;

image.png

image.png

image.png

パターン 2: データクラスを別 Python ファイルに定義してインポートする方法

設定とロジックをファイル単位で分離するアプローチです。@dataclass(frozen=True) を使うことで、設定の不変性が保証されると同時に、IDE の補完や型チェックの恩恵も受けられます。

ETL パイプラインを作成

image.png

my_transformation.py にコードを記述

設定の取得は get_cdf_table_specs_as_dicts() に委譲し、my_transformation.py 自体はロジックに集中できる構成にします。

from pyspark import pipelines as dp
from pyspark.sql import functions as F

from cdf_specs import get_cdf_table_specs_as_dicts


def register_cdf_append_pipeline(spec: dict) -> None:
    source_table = spec["source_table"]
    target_table = spec["target_table"]
    flow_name = spec["flow_name"]

    dp.create_streaming_table(
        name=target_table,
        comment=f"CDF event table from {source_table}",
    )

    @dp.append_flow(
        target=target_table,
        name=flow_name,
        comment=f"Append CDF events from {source_table}",
    )
    def append_cdf_events(
        source_table=source_table,
    ):
        return (
            spark.readStream
            .option("readChangeFeed", "true")
            .table(source_table)
            .withColumn("audit__source_table", F.lit(source_table))
            .withColumn("audit__ingested_at", F.current_timestamp())
        )


for spec in get_cdf_table_specs_as_dicts():
    register_cdf_append_pipeline(spec)

image.png

ディレクトリ名を pattern_2 に変更

パイプライン用のディレクトリを pattern_2 にリネームし、設定ファイルと同じ階層に配置します。

image.png
image.png

cdf_specs.py ファイルの作成とコードの記述

@dataclass(frozen=True)CdfTableSpec を定義します。asdict() で辞書化することで、呼び出し側のコード(my_transformation.py)はキー名でアクセスする既存ロジックをそのまま再利用できます。

image.png

image.png

from dataclasses import asdict, dataclass


@dataclass(frozen=True)
class CdfTableSpec:
    source_table: str
    target_table: str
    flow_name: str
    cluster_by: list[str] | None = None


CDF_TABLE_SPECS = [
    CdfTableSpec(
        source_table="sdp_meta_01.cdf_demo_src.table_01",
        target_table="sdp_meta_01.cdf_demo_tgt_02.table_01_cdf_events",
        flow_name="table_01_cdf_events_cdf_append_flow",
    ),
    CdfTableSpec(
        source_table="sdp_meta_01.cdf_demo_src.table_02",
        target_table="sdp_meta_01.cdf_demo_tgt_02.table_02_cdf_events",
        flow_name="table_02_cdf_append_flow",
    ),
    CdfTableSpec(
        source_table="sdp_meta_01.cdf_demo_src.table_03",
        target_table="sdp_meta_01.cdf_demo_tgt_02.table_03_cdf_events",
        flow_name="table_03_cdf_append_flow",
    ),
]


def get_cdf_table_specs_as_dicts() -> list[dict]:
    return [asdict(spec) for spec in CDF_TABLE_SPECS]

image.png

パイプラインを実行

image.png

image.png

テーブルからデータを取得できることを確認

SELECT * FROM sdp_meta_01.cdf_demo_tgt_02.table_01_cdf_events;
SELECT * FROM sdp_meta_01.cdf_demo_tgt_02.table_02_cdf_events;
SELECT * FROM sdp_meta_01.cdf_demo_tgt_02.table_03_cdf_events;

image.png

image.png

image.png

パターン 3: Databricks テーブルに JSON 形式の設定を保持し、読み取り後に辞書型へ変換する方法

設定そのものを Delta テーブルに保持するアプローチです。設定の追加・削除を コード変更なし で行えるようになるため、運用フェーズで真価を発揮します。

ETL パイプラインを作成

image.png

image.png

image.png

設定テーブルの作成

enabled カラムでオン・オフを切り替えられるようにし、spec_json には JSON 形式で設定本体を格納します。スキーマ進化(設定項目の追加)に強い構造です。

DROP TABLE IF EXISTS sdp_meta_01.cdf_demo_tgt_03.cdf_table_specs;

CREATE TABLE sdp_meta_01.cdf_demo_tgt_03.cdf_table_specs (
  spec_name STRING NOT NULL,
  enabled BOOLEAN NOT NULL,
  spec_json STRING NOT NULL
)
USING DELTA;
INSERT OVERWRITE sdp_meta_01.cdf_demo_tgt_03.cdf_table_specs
SELECT
  'table_01' AS spec_name,
  true AS enabled,
  to_json(
    named_struct(
      'source_table', 'sdp_meta_01.cdf_demo_src.table_01',
      'target_table', 'sdp_meta_01.cdf_demo_tgt_03.table_01_cdf_events',
      'flow_name', 'table_01_cdf_append_flow'
    )
  ) AS spec_json
UNION ALL
SELECT
  'table_02' AS spec_name,
  true AS enabled,
  to_json(
    named_struct(
      'source_table', 'sdp_meta_01.cdf_demo_src.table_02',
      'target_table', 'sdp_meta_01.cdf_demo_tgt_03.table_02_cdf_events',
      'flow_name', 'table_02_cdf_append_flow'
    )
  ) AS spec_json
UNION ALL
SELECT
  'table_03' AS spec_name,
  true AS enabled,
  to_json(
    named_struct(
      'source_table', 'sdp_meta_01.cdf_demo_src.table_03',
      'target_table', 'sdp_meta_01.cdf_demo_tgt_03.table_03_cdf_events',
      'flow_name', 'table_03_cdf_append_flow'
    )
  ) AS spec_json;

my_transformation.py にコードを記述

load_cdf_table_specs_from_config_table() で設定テーブルを読み込み、enabled = true のレコードのみを取得して JSON をパースします。collect() を使う点は注意が必要ですが、設定テーブルのレコード数は限定的なため実用上の問題は起きにくいです。

import json

from pyspark import pipelines as dp
from pyspark.sql import functions as F


CONFIG_TABLE = "sdp_meta_01.cdf_demo_tgt_03.cdf_table_specs"


def load_cdf_table_specs_from_config_table() -> list[dict]:
    rows = (
        spark.read
        .table(CONFIG_TABLE)
        .where("enabled = true")
        .select("spec_name", "spec_json")
        .orderBy("spec_name")
        .collect()
    )

    return [json.loads(row["spec_json"]) for row in rows]


def register_cdf_append_pipeline(spec: dict) -> None:
    source_table = spec["source_table"]
    target_table = spec["target_table"]
    flow_name = spec["flow_name"]

    dp.create_streaming_table(
        name=target_table,
        comment=f"CDF event table from {source_table}",
    )

    @dp.append_flow(
        target=target_table,
        name=flow_name,
        comment=f"Append CDF events from {source_table}",
    )
    def append_cdf_events(
        source_table=source_table,
    ):
        return (
            spark.readStream
            .option("readChangeFeed", "true")
            .table(source_table)
            .withColumn("audit__source_table", F.lit(source_table))
            .withColumn("audit__ingested_at", F.current_timestamp())
        )


for spec in load_cdf_table_specs_from_config_table():
    register_cdf_append_pipeline(spec)

image.png

パイプラインを実行

image.png

image.png

設定テーブルの読み取りが入る分、パイプラインの初期化に 5 分程度かかりました。

image.png

テーブルからデータを取得できることを確認

SELECT * FROM sdp_meta_01.cdf_demo_tgt_03.table_01_cdf_events;
SELECT * FROM sdp_meta_01.cdf_demo_tgt_03.table_02_cdf_events;
SELECT * FROM sdp_meta_01.cdf_demo_tgt_03.table_03_cdf_events;

image.png

image.png

image.png

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?