3
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Databricks: Lakeflow Declarative Pipelines入門

Posted at

Lakeflow Declarative Pipelines入門

概要

Databricksの機能の1つであるLakeflow Declarative Pipelinesに関して具体的な例を挙げながら説明します。

想定する読者

Databricksを触り始めたばかりでLakeflowの機能に関して知りたい方。

解説する機能

  • SQLを用いたLakeflow Declarative Pipelinesの構築手順
  • AutoLoader
  • Capturing Data Changes
  • Expectation
  • Streaming table
  • Materialized view / Temporary view
  • JSONファイルの読み取り・処理方法

Lakeflow Declarative Pipelinesとは

公式ドキュメントに記載された説明

Lakeflow 宣言型パイプラインは、 SQL と Pythonでバッチおよびストリーミング データパイプラインを作成するためのフレームワークです。 Lakeflow宣言型パイプラインの一般的なユースケースには、クラウドストレージ(Amazon S3、Azure ADLS Gen2、Google Cloud Storageなど)やメッセージバス(Apache Kafka、Amazon Kinesis、Google Pub/Sub、 Azure EventHub、Apache Pulsar)、およびインクリメンタル バッチとストリーミングの変換。

上記の説明にあるようにETLパイプラインを構築するための機能であり、個人的にはAWSのGlue、GCPのDataflowに似ていると思います。またLakeflow Declarive Pipelinesは以前Delta Live Tablesと呼ばれる機能でした。(公式ドキュメント )

Spark Structured Streamingとの違い

Lakeflow Declarative Pipelinesに似た機能としてSpark Structured Streamingが存在します。Spark Structured Streamingもストリーミングデータを処理するための機能です。以下がLakeflow Declarative Pipelinesとの比較になります。

Spark Structured Streaming Lakeflow Declarative Pipelines
Spark APIを使用した処理が必要。Spark SQL単体ではSストリーミングテーブルを作成できない。 Python, SQLのどちらでもストリーミングテーブルの作成が可能
Delta Lake table constraintsを利用したデータ品質の担保(ex. NOT NULL, CHECK) Delta Lake table constraintsに加えて、Expectationsなどの機能を利用したデータ品質の担保

Lakeflow Declarative Pipelines構築手順

Lakeflow Declarative PipelinesはPythonまたはSQLでの構築が可能です。このセクションではSQLを使用したパイプラインの構築をします。

構築するパイプラインに関して

ある架空のECサイトで働くデータエンジニアのMikeが、マーケティングチームの要望に応えてトランザクションデータから売上の情報を集約するパイプラインを構築するものとします。

使用するデータ

以下の3種類のデータ(JSON)が特定のVolumesに存在することを前提とします。

  • トランザクションデータ

    • transaction_log_1.json
      一番最後のレコードのtransaction_idnullになっていることに注目してください。

      • データの説明
        • transaction_id: トランザクションID
        • customer_id: 顧客ID
        • product_id: 商品ID
        • quantity: 商品数
        • type:アクションタイプ(ex. add_to_cart:カートに入れる、remove_from_cart:カートから外す、purchased:購入)
        • action_timestamp: アクションが発生したタイムスタンプ
      {"transaction_id": "00001", "customer_id":"S00001","product_id":"P00001","quantity": 2,"type": "add_to_cart", "action_timestamp": 1742566543}
      {"transaction_id": "00002", "customer_id":"S00001","product_id":"P00001","quantity": 2,"type": "purchased", "action_timestamp": 1742566850}
      {"transaction_id": "00003", "customer_id":"S00002","product_id":"P00002","quantity": 1,"type": "add_to_cart", "action_timestamp": 1742566603}
      {"transaction_id": "00004", "customer_id":"S00002","product_id":"P00002","quantity": 1,"type": "remove_from_cart", "action_timestamp": 1742566665}
      {"transaction_id": "00005", "customer_id":"S00002","product_id":"P00003","quantity": 1,"type": "add_to_cart", "action_timestamp": 1742825743}
      {"transaction_id": "00006", "customer_id":"S00002","product_id":"P00003","quantity": 1,"type": "purchased", "action_timestamp":  1742826043}
      {"transaction_id": "00007", "customer_id":"S00003","product_id":"P00001","quantity": 2,"type": "add_to_cart", "action_timestamp": 1742825743}
      {"transaction_id": "00008", "customer_id":"S00003","product_id":"P00001","quantity": 2,"type": "purchased", "action_timestamp": 1742826343}
      {"transaction_id": "00009", "customer_id":"S00004","product_id":"P00001","quantity": 2,"type": "add_to_cart", "action_timestamp": 1750774555}
      {"transaction_id": "00010", "customer_id":"S00004","product_id":"P00001","quantity": 2,"type": "purchased", "action_timestamp":  1750775155}
      {"transaction_id": "00011", "customer_id":"S00005","product_id":"P00001","quantity": 1,"type": "add_to_cart", "action_timestamp": 1753366555}
      {"transaction_id": "00012", "customer_id":"S00005","product_id":"P00001","quantity": 1,"type": "purchased", "action_timestamp":  1753366735}
      {"transaction_id": "00013", "customer_id":"S00001","product_id":"P00004","quantity": 1,"type": "add_to_cart", "action_timestamp": 1753319755}
      {"transaction_id": "00014", "customer_id":"S00001","product_id":"P00004","quantity": 1,"type": "purchased", "action_timestamp": 1753320355}
      {"transaction_id": "00016", "customer_id":"S00002","product_id":"P00001","quantity": 1,"type": "add_to_cart", "action_timestamp": 1753330735}
      {"transaction_id": "00017", "customer_id":"S00002","product_id":"P00001","quantity": 1,"type": "purchased", "action_timestamp": 1753331155}
      {"transaction_id": "00018", "customer_id":"S00002","product_id":"P00004","quantity": 1,"type": "add_to_cart", "action_timestamp": 1753417135}
      {"transaction_id": "00019", "customer_id":"S00002","product_id":"P00004","quantity": 1,"type": "purchased", "action_timestamp": 1753417855}
      {"transaction_id": null,    "customer_id":"S00001","product_id":"P00001","quantity": 2,"type": "add_to_cart", "action_timestamp": 1754022655}
      
  • 商品データ

    • データの説明

      • product_id: 商品ID
      • type: 商品タイプ
      • size: 商品サイズ
      • price: 商品価格
      • category: 商品カテゴリ
      • operaiton: レコードの変更情報(INSERT, UPDATE, DELETEのいずれか1つの値を持つ)
      • operation_time: レコードに変更が発生した時間
    • product_attributes_1.json

      {"product_id":"P00001", "type": "T-shirt", "size":"M", "price": 2000, "category": "Fashion/Tops", "operation": "INSERT", "operation_time": "2023-01-14T23:15:43"}
      {"product_id":"P00002", "type": "Shirt", "size":"L", "price": 5000, "category": "Fashion/Tops", "operation": "INSERT", "operation_time": "2023-01-14T23:15:43"}
      {"product_id":"P00003", "type": "Hoodie", "size":"L", "price": 10000, "category": "Fashion/Tops", "operation": "INSERT", "operation_time": "2023-01-14T23:15:43"}
      {"product_id":"P00004", "type": "Sweater", "size":"S", "price": 5000, "category": "Fashion/Tops", "operation": "INSERT", "operation_time": "2023-01-14T23:15:43"}
      {"product_id":"P00005", "type": "Cardiganr", "size":"M", "price": 4000, "category": "Fashion/Tops", "operation": "INSERT", "operation_time": "2023-01-14T23:15:43"}
      {"product_id":"P00006", "type": "Vest", "size":"M", "price": 4000, "category": "Fashion/Tops", "operation": "INSERT", "operation_time": "2023-01-14T23:15:43"}
      
    • product_attributes_2.json (1回目のジョブの実行後, 2回目のジョブの実行前に追加されるものと想定します)ここでは、product_idP00001のレコードが更新され、product_idP00006のレコードが削除されていることに注目してください

    • {"product_id":"P00001", "type": "T-shirt", "size":"M", "price": 3000, "category": "Fashion/Tops", "operation": "UPDATE", "operation_time": "2025-01-14T23:15:43"}
      {"product_id":"P00006", "type": null, "size": null, "price": null, "category": null, "operation": "DELETE", "operation_time": "2024-01-14T23:15:43"}
      
  • 顧客データ

    • データの説明
      • customer_id: 顧客ID
      • email: メールアドレス
      • credit_card: 登録クレジットカード情報
      • profile: プロフィール情報
        • first_name: 名前
        • last_name: 苗字
        • address: 住所
          • street: 番地
          • city: 町
          • country: 国
      • created_at: レコード作成日時
      • updated_at: レコード更新日時
    {"customer_id":"S00001","email":"sarah.mitchell92@gmail.com","credit_card": "American Express", "profile":"{\"first_name\":\"Sarah\",\"last_name\":\"Mitchell\", \"address\":{\"street\":\"A1 street\",\"city\":\"Paris\",\"country\":\"France\"}}","created_at": "2023-12-14T23:15:43.375Z", "updated_at":"2024-12-14T23:15:43.375Z"}
    {"customer_id":"S00002","email":"alex_thompson@yahoo.com","credit_card": "Discover" ,"profile":"{\"first_name\":\"Alex\",\"last_name\":\"Thompson\",\"address\":{\"street\":\"A1 street\",\"city\":\"New York\",\"country\":\"US\"}}","created_at": "2023-12-25T23:15:43.375Z", "updated_at":"2024-12-25T23:15:43.375Z"}
    {"customer_id":"S00003","email":"mike.rodriguez.2024@outlook.com","credit_card": "Mastercard","profile":"{\"first_name\":\"Mike\",\"last_name\":\"Rodriguez\",\"address\":{\"street\":\"A1 street\",\"city\":\"San Francisco\",\"country\":\"US\"}}","created_at": "2024-12-14T23:15:43.375Z", "updated_at":"2024-12-25T23:15:43.375Z"}
    {"customer_id":"S00004","email":"jennifer.lane@gmail.com","credit_card":"PayPay","profile":"{\"first_name\":\"Jennifer\",\"last_name\":\"Lane\", \"address\":{\"street\":\"A1 street\",\"city\":\"Tokyo\",\"country\":\"Japan\"}}","created_at": "2025-01-14T23:15:43.375Z", "updated_at":"2025-01-14T23:15:43.375Z"}
    {"customer_id":"S00005","email":"robert.chang88@yahoo.com","credit_card":"Visa", "profile":"{\"first_name\":\"Robert\",\"last_name\":\"Chang88\", \"address\":{\"street\":\"A1 street\",\"city\":\"Seoul\",\"country\":\"South Korea\"}}","created_at": "2025-03-14T23:15:43.375Z", "updated_at":"2025-03-14T23:15:43.375Z"}
    
パイプラインのアーキテクチャ

Medallion Architectureを採用して、上記のデータからデータを加工していきます。

  • Bronze Layer
    データを変換、加工・整形する前にこの層ではデータソースのデータをそのまま取り込みます。
    • product_attributes_raw (Streaming Table)
    • customers (Materialized View)
    • transaction_logs_raw (Streaming Table)
  • Silver Layer
    Bronze Layerで作成されたデータを変換、加工・整形します。
    • product_attributes (Streaming Table)
    • transaction_logs (Streaming Table)
  • Gold Layer
    BIや高度な分析用途で使用できる形にSilver Layerで作成されたデータを集約します
    • total_sales_per_product
    • total_amount_spent_per_customer
最終的に構築されたパイプライン図(DAG)

スクリーンショット 2025-08-26 15.33.18.png

Step1 : パイプラインの元となるNotebookの作成

1-1.Volumesが定義されているスキーマのパスを変数として設定

  %sql
  SET source_data.dataset_path=/Volumes/k_suzuki_catalog/ldp_sample_1;

スクリーンショット 2025-08-26 16.09.53.png

1-2. Bronze layerの作成

  • transactions_logs_rawテーブルの作成

         %sql
          CREATE OR REFRESH STREAMING TABLE transactions_logs_raw
          COMMENT "The raw transaction log, ingested from transactions_log volme"
          AS SELECT * FROM cloud_files("${source_data.dataset_path}/transactions_log",
                                      "json",
                                      map("cloudFiles.inferColumnTypes", "true"))
    
  • product_attributes_rawテーブルの作成

          %sql
            CREATE OR REFRESH STREAMING TABLE product_attributes_raw
            COMMENT "The raw product table, ingested from product_attributes volme"
            AS SELECT * FROM cloud_files("${source_data.dataset_path}/product_attributes",
                                        "json",
                                        map("cloudFiles.inferColumnTypes", "true"))
    

AutoLoaderを使用してVolumesからJSONログを読み取り、ストリーミングテーブルを作成します。AutoLoaderを使用することで既に処理をしたファイルは無視して、新規追加されたファイルのみの取り込みの処理を開始できます。
cloud_files 関数を使用することでAutoLoaderを適用して読み取りを実行することができます。AutoLoaderではParquet形式のファイルの場合、ファイルから事前定義されたスキーマ情報を指定して読み取りますが、CSVやJSON形式のファイルではデフォルトで全カラム情報をstring型として読み取るため、cloudFiles.inferColumnTypesの値をtrueにするオプションを設定を設定しています。

  • customersビューの作成

    %sql
    CREATE OR REPLACE MATERIALIZED VIEW customers
    COMMENT "The customer table, ingested from customer_attributes json"
    AS SELECT * FROM json.`${source_data.dataset_path}/customer_attributes/`
    

customersテーブルのソースデータは上書きされる可能性があるため、 MATERIALIZED VIEWとして定義します。Streaming Tableを作成する場合はソースデータがAppend Only(レコードの追加のみでレコードの変更が発生しない)なことが条件となります。

1-3. Silver layer

  • transaction_logsテーブルの作成

          CREATE OR REFRESH STREAMING TABLE transaction_logs (
           CONSTRAINT valid_transaction_id EXPECT (transaction_id IS NOT NULL) ON VIOLATION DROP ROW
          )
          COMMENT "The purchased products with valid transaction_id"
          AS
           SELECT 
             t.transaction_id,
             t.customer_id, 
             t.product_id,
             t.quantity,
             t.type AS action_type,
             c.email, 
             c.credit_card, 
             c.profile:first_name AS first_name, 
             c.profile:last_name AS last_name,
             c.profile:address:country AS country,
             cast(from_unixtime(t.action_timestamp, 'yyyy-MM-dd HH:mm:ss') AS timestamp) AS formatted_action_timestamp
           FROM STREAM(LIVE.transactions_logs_raw) AS t
           LEFT JOIN LIVE.customers AS c
             ON t.customer_id = c.customer_id
          ;
    

    Bronze Layerで作成した2つのテーブル、transactions_logs_rawとcustomersを結合してtransaction_logsテーブルを作成します。
    ここでポイントなるのが以下のEXPECT構文です。CONSTRAINTキーワードを付与することでデータが従わなければならない条件を定義しています。

      CONSTRAINT <constraint_name> EXPECT (<condition>) ON VIOLATION <action>
    
ON VIOLATION actions 動作
DROP ROW レコードを破棄する
FAIL UPDATE パイプラインを失敗させる
No action (default) レコードを保持。メトリクスに条件違反があったことをレポートする。

今回は、transaction_idがNULLの場合は不正レコードとして見なし後続のテーブルには挿入しないように該当の条件違反のレコードを破棄しています。

  • product_attributesテーブルの作成

    CREATE OR REFRESH STREAMING TABLE product_attributes;
    
    APPLY CHANGES INTO LIVE.product_attributes
     FROM STREAM(LIVE.product_attributes_raw)
     KEYS (product_id)
     APPLY AS DELETE WHEN operation = "DELETE"
     SEQUENCE BY operation_time
     COLUMNS * EXCEPT (operation, operation_time)
    

Change data capture (CDC) というプロセスを経てソーステーブル(product_attributes_raw) とターゲットテーブル(product_attributes)を同期させるSQLを定義しています。
具体的には以下のアクションと定義しています。

  • product_idをプライマリキーにレコードがターゲットテーブルに存在するかを調べます。存在しない場合はINSERT,存在する場合UPDATEすると判断します
  • operationの値がDELETEの場合、ソーステーブルからレコードが削除されたと見なしてターゲットテーブルから該当のレコードを削除します
  • operation_timeを SEQUENCE BYの値に指定することで、レコードのアクション(operation)の正しい順序(時間軸)を保証します

また今回は常に最新のデータでレコードを上書きする、SCDタイプ1(デフォルト)を採用しています。Lakeflow Declarative PipelinesではSCDタイプ1、SCDタイプ2をサポートしています。
CDCは従来なカスタムロジックによって実現されていた変更の反映の煩雑を減らしてくれる一方で、CDCにより変更が反映されたテーブルはappend-onlyでは無くなるため、後続の処理ではストリーミングテーブルとして使用することができなくなるというデメリットが存在します。

1-4.Gold layer

  • total_amount_spent_per_customerビューの作成
    CREATE OR REPLACE TEMPORARY VIEW total_amount_spent_per_customer
    COMMENT "Total amount spent per customer on products"
    AS
     SELECT 
       t.customer_id, 
       t.first_name, 
       t.last_name,
       sum(t.quantity * p.price) AS total_amount_spent
     FROM LIVE.transaction_logs AS t
     LEFT JOIN
       LIVE.product_attributes AS p
     WHERE
       action_type = "purchased"
     GROUP BY 
       customer_id, 
       first_name, 
       last_name
    ;
    
    

ここではtmporary viewの作成をしています。TEMPORARYキーワードが付く場合、このviewはパイプラインの実行中のみアクセス可能でcatalogには登録されません。

  • total_amount_spent_per_customerビューの作成

    CREATE OR REPLACE MATERIALIZED VIEW total_sales_per_product
    COMMENT "Total sales on products"
    AS
     SELECT 
       p.product_id,
       sum(t.quantity * p.price) AS total_sales
     FROM LIVE.transaction_logs AS t
     LEFT JOIN
       LIVE.product_attributes AS p
     WHERE
       action_type = "purchased"
     GROUP BY 
       p.product_id
    ;
    

ここではMaterialized Viewを定義しています。Materialized Viewはパイプラインの実行の度に再作成されます。ここではまたソースデータ(テーブル)が更新、削除されるため(append-only)ではないため、ストリーミングテーブルとして定義しています。

  • Streaming Table
    • 最後のジョブ実行から新規追加されたデータのみを処理します
    • CREATE STREAMING TABLE <table> AS <query>で作成されます
    • インプットデータはストリミーングソースである必要があります(ex. AutoLoader, STREAM関数によって読み取られるappend-onlyのテーブル)
  • Materialized views(live tables)
    • ジョブ実行のたびに全レコードを再処理します
      • CREATE MATERIALIZED VIEW <view> AS <query>で作成されます
      • インプットデータは固定データ、更新、削除や上書きがされるデータでも利用可能です

Step2 : パイプラインの元となるNotebookの作成

Jobs & PipelinesからETL Pipelineを作成します。

  • 設定画面のProduct editionはAdvancedを選択してください。これはExpectationがこのエディションでないと実行できないためです。(各エディション毎に提供される機能が異なります
  • Source CodeのPathsには先ほど作成したNotebookのパスを指定してください
  • AdvancedのConfigurationからNotebook内で指定したVolumesのパスの値を持つ変数と、その値のキー、バリューのペアを定義してください。(ex. key: source_data.dataset_path, value: /Volumes/k_suzuki_catalog/ldp_sample_1)
  • Pipeline modeをTriggeredに設定します
    Pipeline modeには、TriggeredまたはContinuousを値として選択できます。
Mode 動作
Triggered パイプラインを1度実行し、次回の手動または定期実行までクラスターを停止します
Continuous 一定間隔でパイプラインを実行。クラスターは常に起動しています。

設定が終わったら作成を押します。

スクリーンショット 2025-08-26 18.13.34.png

パイプラインの実行結果

パイプライン実行のmodeにはDevelopment、Productionの2つが存在します。今回はDevelopment modeで実行をします。

  • Production mode
    • パイプラインの実行の度に新規クラスターが作成されます。実行が終了したらクラスターは停止されます
    • リカバリー可能なエラー(ex. メモリーリーク、期限切れの認証情報)に関してはクラスターが再起動されます
    • エラー時のリトライ機能が存在します
  • Development mode
    • スピーディーな開発、テストを可能にするためクラスターはパイプライン実行の度に新規作成されず再利用されます
    • リトライ機能は無効化されています

1回目の実行結果

パイプラインを作成して、パイプラインの詳細画面に遷移したらバリデーションを実行します。バリデーション結果に問題なければパイプラインを実行します。

  • transaction_logsテーブル

CONTRAINT ... EXPECTtransaction_idnullのレコードを破棄する制約を加えため、テーブルにはnullレコードが入っていないのが分かります。

transaction_logs_v1.png

パイプラインジョブの実行画面のDAGに記載されたtransaction_logsテーブルをクリックすると実行ジョブでのExpectionsの状況が確認できます。今回の例では、transaction_idnullの1レコードがDROPされていることが確認できます。

スクリーンショット 2025-08-27 1.50.19.png

  • product_attributesテーブル
    データソースに存在する全レコードがテーブルに登録されています。

  • total_sales_per_product_v1.png

パイプラインジョブの実行画面では、UPSERTされたレコードが6つ存在することを確認できます。
スクリーンショット 2025-08-27 1.53.09.png

2回目の実行結果

product_attributes_2.jsonをVoumesに追加して2回目のパイプラインを実行します。

  • transaction_logsテーブル
    AutoLoaderにより、既に処理されたデータはスキップされています。これはデータに変更がないことから分かります。

transaction_logs_v1.png

  • product_attributesテーブル
    CDCにより以下2つの変更が発生してます。
  • product_idP00001のレコードが更新されている(価格が2000 -> 3000にUPDATE)
  • product_idP00006のレコードが削除されていることが分かります

product_attributes_v2.png

パイプラインジョブの実行画面では、UPSERTされたレコードが1つ、Deleteされてたレコードが1つ存在することを確認できます。
スクリーンショット 2025-08-27 1.54.52.png

まとめ

以上がLakeflow Declarative Pipelinesの基本的な機能となります。NotebookにSQLを記載するだけで手軽にパイプラインが構築可能な便利な機能だと思います。
詳細は公式ドキュメントの参照をお願いいたします。また不適切、ミスあればコメントにてお知らせください。

3
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?