Lakeflow Declarative Pipelines入門
概要
Databricksの機能の1つであるLakeflow Declarative Pipelinesに関して具体的な例を挙げながら説明します。
想定する読者
Databricksを触り始めたばかりでLakeflowの機能に関して知りたい方。
解説する機能
- SQLを用いたLakeflow Declarative Pipelinesの構築手順
- AutoLoader
- Capturing Data Changes
- Expectation
- Streaming table
- Materialized view / Temporary view
- JSONファイルの読み取り・処理方法
Lakeflow Declarative Pipelinesとは
公式ドキュメントに記載された説明
Lakeflow 宣言型パイプラインは、 SQL と Pythonでバッチおよびストリーミング データパイプラインを作成するためのフレームワークです。 Lakeflow宣言型パイプラインの一般的なユースケースには、クラウドストレージ(Amazon S3、Azure ADLS Gen2、Google Cloud Storageなど)やメッセージバス(Apache Kafka、Amazon Kinesis、Google Pub/Sub、 Azure EventHub、Apache Pulsar)、およびインクリメンタル バッチとストリーミングの変換。
上記の説明にあるようにETLパイプラインを構築するための機能であり、個人的にはAWSのGlue、GCPのDataflowに似ていると思います。またLakeflow Declarive Pipelinesは以前Delta Live Tablesと呼ばれる機能でした。(公式ドキュメント )
Spark Structured Streamingとの違い
Lakeflow Declarative Pipelinesに似た機能としてSpark Structured Streamingが存在します。Spark Structured Streamingもストリーミングデータを処理するための機能です。以下がLakeflow Declarative Pipelinesとの比較になります。
Spark Structured Streaming | Lakeflow Declarative Pipelines |
---|---|
Spark APIを使用した処理が必要。Spark SQL単体ではSストリーミングテーブルを作成できない。 | Python, SQLのどちらでもストリーミングテーブルの作成が可能 |
Delta Lake table constraintsを利用したデータ品質の担保(ex. NOT NULL, CHECK) | Delta Lake table constraintsに加えて、Expectationsなどの機能を利用したデータ品質の担保 |
Lakeflow Declarative Pipelines構築手順
Lakeflow Declarative PipelinesはPythonまたはSQLでの構築が可能です。このセクションではSQLを使用したパイプラインの構築をします。
構築するパイプラインに関して
ある架空のECサイトで働くデータエンジニアのMikeが、マーケティングチームの要望に応えてトランザクションデータから売上の情報を集約するパイプラインを構築するものとします。
使用するデータ
以下の3種類のデータ(JSON)が特定のVolumesに存在することを前提とします。
-
トランザクションデータ
-
transaction_log_1.json
一番最後のレコードのtransaction_id
がnull
になっていることに注目してください。- データの説明
- transaction_id: トランザクションID
- customer_id: 顧客ID
- product_id: 商品ID
- quantity: 商品数
- type:アクションタイプ(ex. add_to_cart:カートに入れる、remove_from_cart:カートから外す、purchased:購入)
- action_timestamp: アクションが発生したタイムスタンプ
{"transaction_id": "00001", "customer_id":"S00001","product_id":"P00001","quantity": 2,"type": "add_to_cart", "action_timestamp": 1742566543} {"transaction_id": "00002", "customer_id":"S00001","product_id":"P00001","quantity": 2,"type": "purchased", "action_timestamp": 1742566850} {"transaction_id": "00003", "customer_id":"S00002","product_id":"P00002","quantity": 1,"type": "add_to_cart", "action_timestamp": 1742566603} {"transaction_id": "00004", "customer_id":"S00002","product_id":"P00002","quantity": 1,"type": "remove_from_cart", "action_timestamp": 1742566665} {"transaction_id": "00005", "customer_id":"S00002","product_id":"P00003","quantity": 1,"type": "add_to_cart", "action_timestamp": 1742825743} {"transaction_id": "00006", "customer_id":"S00002","product_id":"P00003","quantity": 1,"type": "purchased", "action_timestamp": 1742826043} {"transaction_id": "00007", "customer_id":"S00003","product_id":"P00001","quantity": 2,"type": "add_to_cart", "action_timestamp": 1742825743} {"transaction_id": "00008", "customer_id":"S00003","product_id":"P00001","quantity": 2,"type": "purchased", "action_timestamp": 1742826343} {"transaction_id": "00009", "customer_id":"S00004","product_id":"P00001","quantity": 2,"type": "add_to_cart", "action_timestamp": 1750774555} {"transaction_id": "00010", "customer_id":"S00004","product_id":"P00001","quantity": 2,"type": "purchased", "action_timestamp": 1750775155} {"transaction_id": "00011", "customer_id":"S00005","product_id":"P00001","quantity": 1,"type": "add_to_cart", "action_timestamp": 1753366555} {"transaction_id": "00012", "customer_id":"S00005","product_id":"P00001","quantity": 1,"type": "purchased", "action_timestamp": 1753366735} {"transaction_id": "00013", "customer_id":"S00001","product_id":"P00004","quantity": 1,"type": "add_to_cart", "action_timestamp": 1753319755} {"transaction_id": "00014", "customer_id":"S00001","product_id":"P00004","quantity": 1,"type": "purchased", "action_timestamp": 1753320355} {"transaction_id": "00016", "customer_id":"S00002","product_id":"P00001","quantity": 1,"type": "add_to_cart", "action_timestamp": 1753330735} {"transaction_id": "00017", "customer_id":"S00002","product_id":"P00001","quantity": 1,"type": "purchased", "action_timestamp": 1753331155} {"transaction_id": "00018", "customer_id":"S00002","product_id":"P00004","quantity": 1,"type": "add_to_cart", "action_timestamp": 1753417135} {"transaction_id": "00019", "customer_id":"S00002","product_id":"P00004","quantity": 1,"type": "purchased", "action_timestamp": 1753417855} {"transaction_id": null, "customer_id":"S00001","product_id":"P00001","quantity": 2,"type": "add_to_cart", "action_timestamp": 1754022655}
- データの説明
-
-
商品データ
-
データの説明
- product_id: 商品ID
- type: 商品タイプ
- size: 商品サイズ
- price: 商品価格
- category: 商品カテゴリ
- operaiton: レコードの変更情報(INSERT, UPDATE, DELETEのいずれか1つの値を持つ)
- operation_time: レコードに変更が発生した時間
-
product_attributes_1.json
{"product_id":"P00001", "type": "T-shirt", "size":"M", "price": 2000, "category": "Fashion/Tops", "operation": "INSERT", "operation_time": "2023-01-14T23:15:43"} {"product_id":"P00002", "type": "Shirt", "size":"L", "price": 5000, "category": "Fashion/Tops", "operation": "INSERT", "operation_time": "2023-01-14T23:15:43"} {"product_id":"P00003", "type": "Hoodie", "size":"L", "price": 10000, "category": "Fashion/Tops", "operation": "INSERT", "operation_time": "2023-01-14T23:15:43"} {"product_id":"P00004", "type": "Sweater", "size":"S", "price": 5000, "category": "Fashion/Tops", "operation": "INSERT", "operation_time": "2023-01-14T23:15:43"} {"product_id":"P00005", "type": "Cardiganr", "size":"M", "price": 4000, "category": "Fashion/Tops", "operation": "INSERT", "operation_time": "2023-01-14T23:15:43"} {"product_id":"P00006", "type": "Vest", "size":"M", "price": 4000, "category": "Fashion/Tops", "operation": "INSERT", "operation_time": "2023-01-14T23:15:43"}
-
product_attributes_2.json (1回目のジョブの実行後, 2回目のジョブの実行前に追加されるものと想定します)ここでは、
product_id
がP00001
のレコードが更新され、product_id
がP00006
のレコードが削除されていることに注目してください -
{"product_id":"P00001", "type": "T-shirt", "size":"M", "price": 3000, "category": "Fashion/Tops", "operation": "UPDATE", "operation_time": "2025-01-14T23:15:43"} {"product_id":"P00006", "type": null, "size": null, "price": null, "category": null, "operation": "DELETE", "operation_time": "2024-01-14T23:15:43"}
-
-
顧客データ
- データの説明
- customer_id: 顧客ID
- email: メールアドレス
- credit_card: 登録クレジットカード情報
- profile: プロフィール情報
- first_name: 名前
- last_name: 苗字
- address: 住所
- street: 番地
- city: 町
- country: 国
- created_at: レコード作成日時
- updated_at: レコード更新日時
{"customer_id":"S00001","email":"sarah.mitchell92@gmail.com","credit_card": "American Express", "profile":"{\"first_name\":\"Sarah\",\"last_name\":\"Mitchell\", \"address\":{\"street\":\"A1 street\",\"city\":\"Paris\",\"country\":\"France\"}}","created_at": "2023-12-14T23:15:43.375Z", "updated_at":"2024-12-14T23:15:43.375Z"} {"customer_id":"S00002","email":"alex_thompson@yahoo.com","credit_card": "Discover" ,"profile":"{\"first_name\":\"Alex\",\"last_name\":\"Thompson\",\"address\":{\"street\":\"A1 street\",\"city\":\"New York\",\"country\":\"US\"}}","created_at": "2023-12-25T23:15:43.375Z", "updated_at":"2024-12-25T23:15:43.375Z"} {"customer_id":"S00003","email":"mike.rodriguez.2024@outlook.com","credit_card": "Mastercard","profile":"{\"first_name\":\"Mike\",\"last_name\":\"Rodriguez\",\"address\":{\"street\":\"A1 street\",\"city\":\"San Francisco\",\"country\":\"US\"}}","created_at": "2024-12-14T23:15:43.375Z", "updated_at":"2024-12-25T23:15:43.375Z"} {"customer_id":"S00004","email":"jennifer.lane@gmail.com","credit_card":"PayPay","profile":"{\"first_name\":\"Jennifer\",\"last_name\":\"Lane\", \"address\":{\"street\":\"A1 street\",\"city\":\"Tokyo\",\"country\":\"Japan\"}}","created_at": "2025-01-14T23:15:43.375Z", "updated_at":"2025-01-14T23:15:43.375Z"} {"customer_id":"S00005","email":"robert.chang88@yahoo.com","credit_card":"Visa", "profile":"{\"first_name\":\"Robert\",\"last_name\":\"Chang88\", \"address\":{\"street\":\"A1 street\",\"city\":\"Seoul\",\"country\":\"South Korea\"}}","created_at": "2025-03-14T23:15:43.375Z", "updated_at":"2025-03-14T23:15:43.375Z"}
- データの説明
パイプラインのアーキテクチャ
Medallion Architectureを採用して、上記のデータからデータを加工していきます。
- Bronze Layer
データを変換、加工・整形する前にこの層ではデータソースのデータをそのまま取り込みます。- product_attributes_raw (Streaming Table)
- customers (Materialized View)
- transaction_logs_raw (Streaming Table)
- Silver Layer
Bronze Layerで作成されたデータを変換、加工・整形します。- product_attributes (Streaming Table)
- transaction_logs (Streaming Table)
- Gold Layer
BIや高度な分析用途で使用できる形にSilver Layerで作成されたデータを集約します- total_sales_per_product
- total_amount_spent_per_customer
最終的に構築されたパイプライン図(DAG)
Step1 : パイプラインの元となるNotebookの作成
1-1.Volumesが定義されているスキーマのパスを変数として設定
%sql
SET source_data.dataset_path=/Volumes/k_suzuki_catalog/ldp_sample_1;
1-2. Bronze layerの作成
-
transactions_logs_rawテーブルの作成
%sql CREATE OR REFRESH STREAMING TABLE transactions_logs_raw COMMENT "The raw transaction log, ingested from transactions_log volme" AS SELECT * FROM cloud_files("${source_data.dataset_path}/transactions_log", "json", map("cloudFiles.inferColumnTypes", "true"))
-
product_attributes_rawテーブルの作成
%sql CREATE OR REFRESH STREAMING TABLE product_attributes_raw COMMENT "The raw product table, ingested from product_attributes volme" AS SELECT * FROM cloud_files("${source_data.dataset_path}/product_attributes", "json", map("cloudFiles.inferColumnTypes", "true"))
AutoLoaderを使用してVolumesからJSONログを読み取り、ストリーミングテーブルを作成します。AutoLoaderを使用することで既に処理をしたファイルは無視して、新規追加されたファイルのみの取り込みの処理を開始できます。
cloud_files
関数を使用することでAutoLoaderを適用して読み取りを実行することができます。AutoLoaderではParquet形式のファイルの場合、ファイルから事前定義されたスキーマ情報を指定して読み取りますが、CSVやJSON形式のファイルではデフォルトで全カラム情報をstring型として読み取るため、cloudFiles.inferColumnTypes
の値をtrue
にするオプションを設定を設定しています。
-
customersビューの作成
%sql CREATE OR REPLACE MATERIALIZED VIEW customers COMMENT "The customer table, ingested from customer_attributes json" AS SELECT * FROM json.`${source_data.dataset_path}/customer_attributes/`
customersテーブルのソースデータは上書きされる可能性があるため、 MATERIALIZED VIEWとして定義します。Streaming Tableを作成する場合はソースデータがAppend Only(レコードの追加のみでレコードの変更が発生しない)なことが条件となります。
1-3. Silver layer
-
transaction_logsテーブルの作成
CREATE OR REFRESH STREAMING TABLE transaction_logs ( CONSTRAINT valid_transaction_id EXPECT (transaction_id IS NOT NULL) ON VIOLATION DROP ROW ) COMMENT "The purchased products with valid transaction_id" AS SELECT t.transaction_id, t.customer_id, t.product_id, t.quantity, t.type AS action_type, c.email, c.credit_card, c.profile:first_name AS first_name, c.profile:last_name AS last_name, c.profile:address:country AS country, cast(from_unixtime(t.action_timestamp, 'yyyy-MM-dd HH:mm:ss') AS timestamp) AS formatted_action_timestamp FROM STREAM(LIVE.transactions_logs_raw) AS t LEFT JOIN LIVE.customers AS c ON t.customer_id = c.customer_id ;
Bronze Layerで作成した2つのテーブル、transactions_logs_rawとcustomersを結合してtransaction_logsテーブルを作成します。
ここでポイントなるのが以下のEXPECT構文です。CONSTRAINTキーワードを付与することでデータが従わなければならない条件を定義しています。CONSTRAINT <constraint_name> EXPECT (<condition>) ON VIOLATION <action>
ON VIOLATION actions | 動作 |
---|---|
DROP ROW | レコードを破棄する |
FAIL UPDATE | パイプラインを失敗させる |
No action (default) | レコードを保持。メトリクスに条件違反があったことをレポートする。 |
今回は、transaction_idがNULLの場合は不正レコードとして見なし後続のテーブルには挿入しないように該当の条件違反のレコードを破棄しています。
-
product_attributesテーブルの作成
CREATE OR REFRESH STREAMING TABLE product_attributes;
APPLY CHANGES INTO LIVE.product_attributes FROM STREAM(LIVE.product_attributes_raw) KEYS (product_id) APPLY AS DELETE WHEN operation = "DELETE" SEQUENCE BY operation_time COLUMNS * EXCEPT (operation, operation_time)
Change data capture (CDC) というプロセスを経てソーステーブル(product_attributes_raw) とターゲットテーブル(product_attributes)を同期させるSQLを定義しています。
具体的には以下のアクションと定義しています。
- product_idをプライマリキーにレコードがターゲットテーブルに存在するかを調べます。存在しない場合はINSERT,存在する場合UPDATEすると判断します
- operationの値がDELETEの場合、ソーステーブルからレコードが削除されたと見なしてターゲットテーブルから該当のレコードを削除します
- operation_timeを
SEQUENCE BY
の値に指定することで、レコードのアクション(operation)の正しい順序(時間軸)を保証します
また今回は常に最新のデータでレコードを上書きする、SCDタイプ1(デフォルト)を採用しています。Lakeflow Declarative PipelinesではSCDタイプ1、SCDタイプ2をサポートしています。
CDCは従来なカスタムロジックによって実現されていた変更の反映の煩雑を減らしてくれる一方で、CDCにより変更が反映されたテーブルはappend-onlyでは無くなるため、後続の処理ではストリーミングテーブルとして使用することができなくなるというデメリットが存在します。
1-4.Gold layer
- total_amount_spent_per_customerビューの作成
CREATE OR REPLACE TEMPORARY VIEW total_amount_spent_per_customer COMMENT "Total amount spent per customer on products" AS SELECT t.customer_id, t.first_name, t.last_name, sum(t.quantity * p.price) AS total_amount_spent FROM LIVE.transaction_logs AS t LEFT JOIN LIVE.product_attributes AS p WHERE action_type = "purchased" GROUP BY customer_id, first_name, last_name ;
ここではtmporary viewの作成をしています。TEMPORARY
キーワードが付く場合、このviewはパイプラインの実行中のみアクセス可能でcatalogには登録されません。
-
total_amount_spent_per_customerビューの作成
CREATE OR REPLACE MATERIALIZED VIEW total_sales_per_product COMMENT "Total sales on products" AS SELECT p.product_id, sum(t.quantity * p.price) AS total_sales FROM LIVE.transaction_logs AS t LEFT JOIN LIVE.product_attributes AS p WHERE action_type = "purchased" GROUP BY p.product_id ;
ここではMaterialized Viewを定義しています。Materialized Viewはパイプラインの実行の度に再作成されます。ここではまたソースデータ(テーブル)が更新、削除されるため(append-only)ではないため、ストリーミングテーブルとして定義しています。
- Streaming Table
- 最後のジョブ実行から新規追加されたデータのみを処理します
-
CREATE STREAMING TABLE <table> AS <query>
で作成されます - インプットデータはストリミーングソースである必要があります(ex. AutoLoader,
STREAM
関数によって読み取られるappend-onlyのテーブル)
- Materialized views(live tables)
- ジョブ実行のたびに全レコードを再処理します
-
CREATE MATERIALIZED VIEW <view> AS <query>
で作成されます - インプットデータは固定データ、更新、削除や上書きがされるデータでも利用可能です
-
- ジョブ実行のたびに全レコードを再処理します
Step2 : パイプラインの元となるNotebookの作成
Jobs & PipelinesからETL Pipelineを作成します。
- 設定画面のProduct editionは
Advanced
を選択してください。これはExpectationがこのエディションでないと実行できないためです。(各エディション毎に提供される機能が異なります) - Source CodeのPathsには先ほど作成したNotebookのパスを指定してください
- AdvancedのConfigurationからNotebook内で指定したVolumesのパスの値を持つ変数と、その値のキー、バリューのペアを定義してください。(ex. key:
source_data.dataset_path
, value:/Volumes/k_suzuki_catalog/ldp_sample_1
) - Pipeline modeをTriggeredに設定します
Pipeline modeには、TriggeredまたはContinuousを値として選択できます。
Mode | 動作 |
---|---|
Triggered | パイプラインを1度実行し、次回の手動または定期実行までクラスターを停止します |
Continuous | 一定間隔でパイプラインを実行。クラスターは常に起動しています。 |
設定が終わったら作成を押します。
パイプラインの実行結果
パイプライン実行のmodeにはDevelopment、Productionの2つが存在します。今回はDevelopment modeで実行をします。
- Production mode
- パイプラインの実行の度に新規クラスターが作成されます。実行が終了したらクラスターは停止されます
- リカバリー可能なエラー(ex. メモリーリーク、期限切れの認証情報)に関してはクラスターが再起動されます
- エラー時のリトライ機能が存在します
- Development mode
- スピーディーな開発、テストを可能にするためクラスターはパイプライン実行の度に新規作成されず再利用されます
- リトライ機能は無効化されています
1回目の実行結果
パイプラインを作成して、パイプラインの詳細画面に遷移したらバリデーションを実行します。バリデーション結果に問題なければパイプラインを実行します。
- transaction_logsテーブル
CONTRAINT ... EXPECT
でtransaction_id
がnull
のレコードを破棄する制約を加えため、テーブルにはnull
レコードが入っていないのが分かります。
パイプラインジョブの実行画面のDAGに記載されたtransaction_logsテーブルをクリックすると実行ジョブでのExpectionsの状況が確認できます。今回の例では、transaction_id
がnull
の1レコードがDROPされていることが確認できます。
パイプラインジョブの実行画面では、UPSERTされたレコードが6つ存在することを確認できます。
2回目の実行結果
product_attributes_2.jsonをVoumesに追加して2回目のパイプラインを実行します。
- transaction_logsテーブル
AutoLoaderにより、既に処理されたデータはスキップされています。これはデータに変更がないことから分かります。
- product_attributesテーブル
CDCにより以下2つの変更が発生してます。 -
product_id
がP00001
のレコードが更新されている(価格が2000 -> 3000にUPDATE) -
product_id
がP00006
のレコードが削除されていることが分かります
パイプラインジョブの実行画面では、UPSERTされたレコードが1つ、Deleteされてたレコードが1つ存在することを確認できます。
まとめ
以上がLakeflow Declarative Pipelinesの基本的な機能となります。NotebookにSQLを記載するだけで手軽にパイプラインが構築可能な便利な機能だと思います。
詳細は公式ドキュメントの参照をお願いいたします。また不適切、ミスあればコメントにてお知らせください。