このようなページができていたとは。
Delta Live Tablesとは
Delta Live Tables は、信頼性と保守性に優れた抽出、変換、読み込み (ETL) パイプラインの作成を簡略化するために設計された宣言型フレームワークです。 取り込むデータとその変換方法を指定すると、オーケストレーション、コンピュート管理、モニタリング、データ品質の適用、エラー処理など、データパイプラインの管理の主要な側面が Delta Live Tables 自動化されます。
エクスペクテーションとは
エクスペクテーションを定義することで、Delta Live Tablesパイプラインを流れるデータを検証する品質制約を適用します。エクスペクテーションに違反したレコードを削除、あるいはパイプラインの停止を行うことができます。
エクスペクテーションの可搬性の向上
そして、タイトルの話です。最初のページにはいくつかのエクスペクテーションのベストプラクティスがまとめられています。最初の例に膝を打ったので動かしてみます。
考え方は以下の通りです。
- パイプラインロジックとエクスペクテーションの分離: エクスペクテーションの定義をパイプラインのロジックとは別に格納します。パイプラインの定義にエクスペクテーションを組み込むことは可能ですが、可搬性や再利用性を損なってしまいます。モジュール化することで複数のデータセットやパイプラインにエクスペクテーションを簡単に適用できます。
- カスタムタグの適用: カスタムタグを追加して、関連するエクスペクテーションのグループを作成します。タグに基づいてエクスペクテーションをフィルタリングできるようになります。
- 一貫したエクスペクテーションの適用: 類似したデータセットに一貫性のあるエクスペクテーションを適用します。複数のデータセットとパイプラインで同じエクスペクテーションを使用して、同一のロジックを評価します。
まず、エクスペクテーションの定義をPythonファイルに記載します。ディクショナリーで名前name
と制約constraint
、カスタムタグtag
を定義しています。
def get_rules_as_list_of_dict():
return [
{
"name": "website_not_null",
"constraint": "Website IS NOT NULL",
"tag": "validity"
},
{
"name": "fresh_data",
"constraint": "to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'",
"tag": "maintained"
},
{
"name": "social_media_access",
"constraint": "NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)",
"tag": "maintained"
}
]
このPythonファイルと同じパスにパイプラインロジックを記述するノートブックを作成します。こちらの機能によって、パイプラインノートブックから上記rules_module.py
をPythonモジュールとしてインポートすることができます。
以下の肝はfrom rules_module import *
と関数get_rules
です。get_rules
はタグを引数として、tag
がマッチする制約条件をディクショナリーで返却します。これによって、以下にあるようにget_rules('validity')
でタグvalidity
に合致する制約条件をエクスペクテーションとしてテーブルに適用することができます。なるほど。二つ目のテーブルではタグmaintained
のエクスペクテーションを適用しています。
import dlt
from rules_module import *
from pyspark.sql.functions import expr, col
def get_rules(tag):
"""
データ品質ルールをテーブルから読み込む
:param tag: 検索するタグ
:return: タグに一致したルールの辞書
"""
return {
row['name']: row['constraint']
for row in get_rules_as_list_of_dict()
if row['tag'] == tag
}
@dlt.table
@dlt.expect_all_or_drop(get_rules('validity'))
def raw_farmers_market():
return (
spark.read.format('csv').option("header", "true")
.load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
)
@dlt.table
@dlt.expect_all_or_drop(get_rules('maintained'))
def organic_farmers_market():
return (
dlt.read("raw_farmers_market")
.filter(expr("Organic = 'Y'"))
)
このノートブックをソースコードとして設定してパイプラインとして実行すると、以下のように期待した通りにエクスペクテーションが適用されています。