Transform data with Delta Live Tables | Databricks on AWS [2023/6/8時点]の翻訳です。
本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。
本書では、データセットに対する変換処理を宣言するためにどのようにDelta Live Tablesを活用し、クエリーのロジックを通じてどのようにレコードが処理されるのかを指定するのかを説明します。また、Delta Live Tablesパイプラインを構築する際に有用な共通変換パターンの例も説明します。
データフレームを返却する任意のクエリーに対するデータセットを定義することができます。Delta Live Tablesパイプラインの変換処理では、Apache Sparkのビルトインオペレーション、UDF、カスタムロジック、MLflowモデルを活用することができます。Delta Live Tablesパイプラインにデータが取り込まれると、新たなストリーミングテーブル、マテリアライズドビュー、ビューを作成するために、上流のソースに対して新たなデータセットを定義することができます。
ビュー、マテリアライズドビュー、ストリーミングテーブルをいつ使うのか
あなたのパイプラインを効率的、管理可能にするために、あなたのパイプラインクエリーを実装する際にベストなデータセットタイプを選択しましょう。
以下のケースではビューの使用を検討しましょう:
- 管理しやすいクエリーに分割したい大規模かつ複雑なクエリーがある。
- エクスペクテーションを用いて中間結果を検証したい。
- ストレージと計算コストを削減したくて、クエリー結果のマテリアライゼーションを必要としていない。テーブルをマテリアライズする際には、追加の計算リソースとストレージリソースが必要です。
以下のケースではマテリアライズドビューの使用を検討しましょう:
- 複数の下流のクエリーがテーブルを利用する。ビューはオンデマンドで計算され、ビューがクエリーされるたびに再計算されます。
- 他のパイプライン、ジョブ、クエリーがテーブルを利用する。ビューはマテリアライズされないので、同じパイプラインでしか利用できません。
- 開発過程でクエリー結果を参照したい。テーブルはマテリアライズされるので、パイプラインの外から参照、クエリーできるので、開発過程でテーブルを活用することで、計算処理の適切性の検証に役立ちます。検証後は、マテリアライズを必要としないクエリーをビューに変換します。
以下のケースではストリーミングテーブルの使用を検討しましょう:
- 連続的、インクリメンタルに増加するデータソースに対してクエリーを定義する。
- クエリー結果をインクリメンタルに計算する必要がある。
- パイプラインで高いスループットと低レーテンシーが求められる。
注意
ストリーミングソースに対しては、常にストリーミングテーブルが定義されます。CDCフィードからの更新を適用するために、APPLY CHANGES INTO
とストリーミングソースを活用することもできます。Delta Live Tablesによるチェンジデータキャプチャ(CDC)をご覧ください。
単一のパイプラインでストリーミングテーブルとマテリアライズドビューを組み合わせる
ストリーミングテーブルは、Apache Sparkの構造化ストリーミングの処理保証を継承し、ソーステーブルが更新されるのではなく、常に新規の行が追加される追加のみのデータソースからのクエリーを処理するように設定されます。
注意
デフォルトでは、ストリーミングテーブルは追加のみのデータソースを必要としますが、ストリーミングソースが更新や削除を必要とする別のストリーミングテーブルの場合、skipChangeCommitsフラグでこの挙動をオーバーライドすることができます。
一般的なストリーミングパターンには、パイプラインにおける初期データセットを作成するためにソースデータを取り込むというものがあります。この初期データセットは一般的にはブロンズテーブルと呼ばれ、多くの場合シンプルな変換処理を行います。
一方で、パイプラインにおける最終テーブルはゴールドテーブルと呼ばれ、多くの場合、複雑な集計処理やAPPLY CHANGES INTO
オペレーションのターゲットであるソースからの読み込みを行います。これらのオペレーションは性質上、追加ではなく更新処理を伴い、これらはストリーミングテーブルの入力としてサポートされていません。これらの変換処理はマテリアライズドビューの方が適しています。
単一のパイプラインでストリーミングテーブルとマテリアライズドビューを組み合わせることで、パイプラインをシンプルにすることができ、高コストな生データの再取り込みや再処理を回避し、効率的にエンコードされ、フィルタリングされたデータセットに対して複雑な集計処理を行うために、SQLのフルパワーを活用することができます。以下のサンプルでは、このタイプの混成処理の例を示しています:
注意
これらのサンプルでは、クラウドストレージからファイルをロードするために、Auto Loaderを活用しています。Unity Catalogが有効化されたパイプラインでAuto Loaderを用いてファイルをロードするには、外部ロケーションを使用しなくてはなりません。Delta Live TablesとUnity Catalogmの活用に関しては、Delta Live TablesパイプラインにおけるUnity Catalogの使用をご覧ください。
@dlt.table
def streaming_bronze():
return (
# Since this is a streaming source, this table is incremental.
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("s3://path/to/raw/data")
)
@dlt.table
def streaming_silver():
# Since we read the bronze table as a stream, this silver table is also
# updated incrementally.
return dlt.read_stream("streaming_bronze").where(...)
@dlt.table
def live_gold():
# This table will be recomputed completely by reading the whole silver table
# when it is updated.
return dlt.read("streaming_silver").groupBy("user_id").count()
CREATE OR REFRESH STREAMING TABLE streaming_bronze
AS SELECT * FROM cloud_files(
"s3://path/to/raw/data", "json"
)
CREATE OR REFRESH STREAMING TABLE streaming_silver
AS SELECT * FROM STREAM(LIVE.streaming_bronze) WHERE...
CREATE OR REFRESH LIVE TABLE live_gold
AS SELECT count(*) FROM LIVE.streaming_silver GROUP BY user_id
インクリメンタル処理でS3からJSONファイルを効率的に読み込むためのAuto Loaderの活用に関してはドキュメントを参照ください。
ストリーム-静的テーブルのjoin
ストリーム-静的テーブルのjoinは、主に静的なディメンションテーブルと追加のみのデータの連続ストリーム非正規化を行う際には良い選択肢となります。
それぞれのパイプラインのアップデートによって、ストリームからの新規レコードは、静的テーブルの最新のスナップショットとjoinされます。ストリーミングテーブルからの対応データが処理された後に、静的テーブルのレコードが追加、更新され他場合、フルリフレッシュを行い限りい、結果レコードは再計算されません。
トリガー実行が設定されたパイプラインでは、静的テーブルはアップデートが開始した時点の結果を返却します。連続実行が設定されたパイプラインでは、テーブルがアップデートを処理する都度、静的テーブルの最新バージョンがクエリーされます。
以下にストリームと静的テーブルのjoinの例を示します。
@dlt.table
def customer_sales():
return dlt.read_stream("sales").join(read("customers"), ["customer_id"], "left")
CREATE OR REFRESH STREAMING TABLE customer_sales
AS SELECT * FROM STREAM(LIVE.sales)
INNER JOIN LEFT LIVE.customers USING (customer_id)
効率的に集計計算を行う
min、max、sumのようなシンプルで分配可能な集計処理や平均、標準偏差のような代数集計をインクリメンタルに計算するためにストリーミングテーブルを活用することができます。限定数のグループ、例えば、GROUP BY country
句のようなクエリーではインクリメンタルな集計を行うことをお勧めします。それぞれのアップデートで新規の入力データのみが読み込まれます。
Delta Live TablesパイプラインでMLflowモデルを使う
Delta Live TablesパイプラインでMLflowでトレーニングしたモデルを活用することができます。MLflowモデルはDatabricksでは変換処理として取り扱われるので、Sparkデータフレームの入力に対して動作し、結果としてSparkデータフレームが返却されることを意味します。Delta Live Tablesはデータフレームに対してデータセットを定義するので、MLflowを活用するApache Sparkワークロードを数行のコードでDelta Live Tablesに変換することができます。MLflowに関しては、Databricks MLflowガイドをご覧ください。
MLflowモデルを呼び出すPythonノートブックがすでにあるのであれば、@dlt.table
デコレーターを用いてこのコードをDelta Live Tablesに適応させることができ、関数が変換結果を返却するように定義することができます。Delta Live TablesはデフォルトではMLflowをインストールしていないので、ノートブックの先頭で%pip install mlflow
を行い、mlflow
とdlt
をインポートするようにしてください。Delta Live Tables構文の入門については、Tutorial: Declare a data pipeline with Python in Delta Live Tablesをご覧ください。
Delta Live TablesでMLflowモデルを活用するには、以下のステップを完了してください:
- MLflowモデルのランIDとモデル名を取得します。MLflowモデルのURIを構成するために、ランIDとモデル名が使用されます。
- MLflowモデルをロードするためのSpark UDFを定義するためにURIを使います。
- MLflowモデルを使うためにテーブル定義でUDFを使います。
以下の例では、このパターンの基本的な構文を説明しています。
%pip install mlflow
import dlt
import mlflow
run_id= "<mlflow-run-id>"
model_name = "<the-model-name-in-run>"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)
@dlt.table
def model_predictions():
return dlt.read(<input-data>)
.withColumn("prediction", loaded_model_udf(<model-features>))
完全な例として、以下のコードでは、ローンリスクデータでトレーニングされたMLflowモデルをロードするloaded_model_udf
というSpark UDFを定義しています。UDFの引数として、予測に必要なデータカラムを引き渡しています。テーブルloan_risk_predictions
はloan_risk_input_data
のそれぞれの行の予測結果を計算します。
%pip install mlflow
import dlt
import mlflow
from pyspark.sql.functions import struct
run_id = "mlflow_run_id"
model_name = "the_model_name_in_run"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)
categoricals = ["term", "home_ownership", "purpose",
"addr_state","verification_status","application_type"]
numerics = ["loan_amnt", "emp_length", "annual_inc", "dti", "delinq_2yrs",
"revol_util", "total_acc", "credit_length_in_years"]
features = categoricals + numerics
@dlt.table(
comment="GBT ML predictions of loan risk",
table_properties={
"quality": "gold"
}
)
def loan_risk_predictions():
return dlt.read("loan_risk_input_data")
.withColumn('predictions', loaded_model_udf(struct(features)))
手動の削除や更新を保持する
Delta Live Tablesを用いることで、テーブルから手動でレコードを削除、更新することができ、後段のテーブルの再計算を行うためにリフレッシュすることができます。
デフォルトでは、Delta Live Tablesはパイプラインがアップデートされるたびに入力データに基づいてテーブルの結果を再計算するので、ソースデータから削除されたレコードが再ロードされないようにする必要があります。テーブルプロパティpipelines.reset.allowed
をfalse
に設定することで、テーブルのリフレッシュを防ぎつつも、テーブルに対するインクリメンタルな書き込みやテーブルへの新規データの流入は妨げません。
以下の図では、2つのストリーミングテーブル活用の例を示しています:
-
raw_user_table
はソースから生のユーザーデータを取り込みます。 -
bmi_table
は、raw_user_table
の身長体重を用いて、BMIスコアをインクリメンタルに計算します。
あなたはraw_user_table
のユーザーレコードを手動で削除、更新し、bmi_table
を再計算したいものとします。
以下のコードでは、意図した変更が将来にわたって保持されるように、raw_user_table
のテーブルプロパティpipelines.reset.allowed
をfalse
に設定し、パイプラインのアップデートが実行された際に後段のテーブルが再計算されることをデモンストレーションしています。
CREATE OR REFRESH STREAMING TABLE raw_user_table
TBLPROPERTIES(pipelines.reset.allowed = false)
AS SELECT * FROM cloud_files("/databricks-datasets/iot-stream/data-user", "csv");
CREATE OR REFRESH STREAMING TABLE bmi_table
AS SELECT userid, (weight/2.2) / pow(height*0.0254,2) AS bmi FROM STREAM(LIVE.raw_user_table);