Delta Live Tables cookbook | Databricks on AWS [2021/10/21時点]の翻訳です。
注意
この機能はパブリックプレビューです。アクセスするためには、Delta Live Tablesへのアクセスをリクエストしてください。(訳者注:Databricks担当にご連絡ください)
本書では、Delta Live Tablesのパイプラインにおける一般的なタスクを実装するためのお勧めとソリューションを説明しています。
- エクスペクテーションをポータブルかつ再利用可能にする
- SQLでPython UDFを使う
- Delta Live TablesパイプラインでMLflowモデルを使う
- 開発用、テスト用のサンプルデータセットを作成する
- 複数のliveテーブルをプログラムから管理、作成する
エクスペクテーションをポータブルかつ再利用可能にする
シナリオ
共通のデータ品質に関するルールを複数のテーブルに対して適用したい、あるいは、パイプライン開発者とは別のチームメンバーがデータ品質に関するルールを開発、メンテナンスしたいと考えています。
ソリューション
パイプラインの実装とは別にデータ品質ルールを管理します。DBFS(Databricksファイルシステム)やクラウドストレージ、Deltaテーブルなど信頼性があり、容易にアクセスできるフォーマットで、ルールを格納します。以下の例では、ルールを管理するためにrules.csv
というCSVファイルをDBFSに格納しています。rules.csv
のそれぞれのルールはタグで分類されます。どのルールを適用するのかを決定するために、データセット定義でこのタグを使用します。
name, constraint, tag
website_not_null,"Website IS NOT NULL",validity
location_not_null,"Location IS NOT NULL",validity
state_not_null,"State IS NOT NULL",validity
fresh_data,"to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'",maintained
social_media_access,"NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)",maintained
以下のPythonのサンプルでは、rules.csv
に格納されたルールに基づいてデータ品質のエクスペクテーションを定義しています。get_rules()
関数はrules.csv
からルールを読み込み、関数に渡された引数tag
にマッチするルールを含むディクショナリーを返却します。データ品質制約を適用するために、このディクショナリーは@dlt.expect_all_*()
デコレーターで適用されます。例えば、validity
タグがついているルールに失敗したレコードはraw_farmers_market
から削除されます。
import dlt
from pyspark.sql.functions import expr, col
def get_rules(tag):
"""
loads data quality rules from csv file
:param tag: tag to match
:return: dictionary of rules that matched the tag
"""
rules = {}
df = spark.read.format("csv").option("header", "true").load("/path/to/rules.csv")
for row in df.filter(col("tag") == tag).collect():
rules[row['name']] = row['constraint']
return rules
@dlt.table(
name="raw_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('validity'))
def get_farmers_market_data():
return (
spark.read.format('csv').option("header", "true")
.load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
)
@dlt.table(
name="organic_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('maintained'))
def get_organic_farmers_market():
return (
dlt.read("raw_farmers_market")
.filter(expr("Organic = 'Y'"))
.select("MarketName", "Website", "State",
"Facebook", "Twitter", "Youtube", "Organic",
"updateTime"
)
)
SQLでPython UDFを使う
シナリオ
Delta Live Tablesデータセットを定義するために、SQLのシンプルさを必要とするが、SQLでサポートされていない変換処理を行いたいと考えています。
ソリューション
お使いのSQLクエリーでPythonのユーザー定義関数(UDF)を使用します。以下の例では、入力引数の二乗を返却するUDFsquare()
を定義、登録し、SQLでUDFsquare()
をコールしています。
-
UDFを定義、登録する。
Default LanguageをPythonに設定してノートブックを作成し、以下のコードをセルに追加します。
Pythondef square(i: int) -> int: """ Simple udf for squaring the parameter passed. :param i: column from Pyspark or SQL :return: squared value of the passed param. """ return i * i spark.udf.register("makeItSquared", square) # register the square udf for Spark SQL
-
UDFをコールします。
SQLノートブックを作成し、以下のクエリーをセルに追加します。SQL
CREATE LIVE TABLE raw_squared
AS SELECT makeItSquared(2) AS numSquared;
```
- パイプランを作成します。
新規Delta Live Tablesパイプラインを作成し、作成したノートブックをNotebook Librariesに追加します。Create Pipelineダイアログでノートブックを追加するのにAdd notebook libraryボタンを使用するか、ノートブックを設定するためにDelta Live Tablesのsettingsのlibraries
フィールドに追加します。
Delta Live TablesパイプラインでMLflowモデルを使う
シナリオ
パイプラインにおいて、MLflowのトレーニング済みモデルを使用したいと考えています。
ソリューション
Delta Live TablesパイプラインでMLflowモデルを使用するには、以下の手順を踏みます。
- MLflowモデルのrun IDとモデル名を取得します。run IDとモデル名は、MLflowモデルのURIを構築するために必要となります。
- MLflowモデルをロードするSpark UDFを定義するためにURIを使用します。
- MLflowモデルを使用するためにテーブル定義でUDFをコールします。
以下の例では、ローンリスクデータに対してトレーニングしたMLflowモデルをロードするloaded_model
というSpark UDFを定義しています。gtb_scoring_train_data
とgtb_scoring_valid_data
というテーブルを定義するためにUDFloaded_model
が使用されます。
%pip install mlflow
import dlt
import mlflow
from pyspark.sql.functions import struct
run_id= "mlflow_run_id"
model_name = "the_model_name_in_run"
model_uri = "runs:/{run_id}/{model_name}".format(run_id=run_id, model_name=model_name)
loaded_model = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)
categoricals = ["term", "home_ownership", "purpose",
"addr_state","verification_status","application_type"]
numerics = ["loan_amnt", "emp_length", "annual_inc", "dti", "delinq_2yrs",
"revol_util", "total_acc", "credit_length_in_years"]
features = categoricals + numerics
@dlt.table(
comment="GBT ML scored training dataset based on Loan Risk",
table_properties={
"quality": "gold"
}
)
def gtb_scoring_train_data():
return dlt.read("train_data")
.withColumn('predictions', loaded_model(struct(features)))
@dlt.table(
comment="GBT ML scored valid dataset based on Loan Risk",
table_properties={
"quality": "gold"
}
)
def gtb_scoring_valid_data():
return dlt.read("valid_data")
.withColumn('predictions', loaded_model(struct(features)))
開発用、テスト用のサンプルデータセットを作成する
シナリオ
データのサブセットや特定のレコードタイプなど、開発やテストのためのサンプルデータセットを作成したいと考えています。
ソリューション
単一あるいは共有ノートブックで変換ロジックを実装します。そして、環境に基づいて複数のデータセットを定義する別のノートブックを作成します。例えば、本格運用(production)においては、パイプラインのために完全なセットのデータを定義するノートブックを作成します。
CREATE INCREMENTAL LIVE TABLE input_data AS SELECT * FROM cloud_files("/production/data", "json")
次に、要件に基づいてサンプルのデータを定義するノートブックを作成します。例えば、テストのために特定のレコードを含む小規模なデータセットを生成します。
CREATE LIVE TABLE input_data AS
SELECT "2021/09/04" AS date, 22.4 as sensor_reading UNION ALL
SELECT "2021/09/05" AS date, 21.5 as sensor_reading
あるいは、開発、テストのために本格運用ようのデータのサブセットを作成することができます。
CREATE LIVE TABLE input_data AS SELECT * FROM prod.input_data WHERE date > current_date() - INTERVAL 1 DAY
異なるデータセットを使用するには、変換ロジックを実装するノートブックを用いて複数のパイプラインを作成します。それぞれのパイプラインはLIVE.input_data
データセットから読み込みを行いますが、環境固有のデータセットを作成するノートブックを含めるように設定されます。
複数のliveテーブルをプログラムから管理、作成する
シナリオ
少しのパラメーターだけが異なる複数のフロー、データセット定義を含むパイプラインがあるとします。この冗長性はエラーが混入しやすく、メンテナンスしにくいものとなります。例えば、以下の図では、異なるカテゴリーの緊急連絡に対する最速のレスポンスが可能となる近隣の消防署を検索するためのデータセットを使用するパイプラインのグラフを示しています。この例では、並列のフローにおいては、わずかなパラメーターのみが異なっています。
ソリューション
冗長なフロー定義を生成、管理するオーバーヘッドを削減するために、メタプログラミングパターンを使用することができます。Delta Live Tablesにおけるメタプログラミングは、Pythonの内部関数(inner function)を使うことで達成することができます。これらの関数は遅延評価されるので、入力パラメーター以外が同じフローを作成するために使用することができます。以下の例のように、それぞれの呼び出しには、それぞれのテーブルをどのように作成するのかを制御する異なるパラメーターセットを含めることができます。
import dlt
from pyspark.sql.functions import *
@dlt.table(
name="raw_fire_department",
comment="raw table for fire department response"
)
@dlt.expect_or_drop("valid_received", "received IS NOT NULL")
@dlt.expect_or_drop("valid_response", "responded IS NOT NULL")
@dlt.expect_or_drop("valid_neighborhood", "neighborhood != 'None'")
def get_raw_fire_department():
return (
spark.read.format('csv')
.option('header', 'true')
.option('multiline', 'true')
.load('/databricks-datasets/timeseries/Fires/Fire_Department_Calls_for_Service.csv')
.withColumnRenamed('Call Type', 'call_type')
.withColumnRenamed('Received DtTm', 'received')
.withColumnRenamed('Response DtTm', 'responded')
.withColumnRenamed('Neighborhooods - Analysis Boundaries', 'neighborhood')
.select('call_type', 'received', 'responded', 'neighborhood')
)
all_tables = []
def generate_tables(call_table, response_table, filter):
@dlt.table(
name=call_table,
comment="top level tables by call type"
)
def create_call_table():
return (
spark.sql("""
SELECT
unix_timestamp(received,'M/d/yyyy h:m:s a') as ts_received,
unix_timestamp(responded,'M/d/yyyy h:m:s a') as ts_responded,
neighborhood
FROM LIVE.raw_fire_department
WHERE call_type = '{filter}'
""".format(filter=filter))
)
@dlt.table(
name=response_table,
comment="top 10 neighborhoods with fastest response time "
)
def create_response_table():
return (
spark.sql("""
SELECT
neighborhood,
AVG((ts_received - ts_responded)) as response_time
FROM LIVE.{call_table}
GROUP BY 1
ORDER BY response_time
LIMIT 10
""".format(call_table=call_table))
)
all_tables.append(response_table)
generate_tables("alarms_table", "alarms_response", "Alarms")
generate_tables("fire_table", "fire_response", "Structure Fire")
generate_tables("medical_table", "medical_response", "Medical Incident")
@dlt.table(
name="best_neighborhoods",
comment="which neighbor appears in the best response time list the most"
)
def summary():
target_tables = [dlt.read(t) for t in all_tables]
unioned = functools.reduce(lambda x,y: x.union(y), target_tables)
return (
unioned.groupBy(col("neighborhood"))
.agg(count("*").alias("score"))
.orderBy(desc("score"))
)