Delta Live Tables user guide | Databricks on AWS [2021/12/20時点]の翻訳です。
Databricksクイックスタートガイドのコンテンツです。
本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。
プレビュー
この機能はパブリックプレビューです。アクセスする際にはDatabricks担当者にお問い合わせください。
本書では、Delta Live Tablesのコンポーネントの詳細と、Delta Live Tablesのパイプラインを実行するために、どのようにコンポーネントが実装されているのかを説明します。
Delta Live Tablesのコンセプト
本章では以下を説明します。
パイプライン
Delta Live Tablesにおける実行のメインユニットとなるのがパイプラインです。パイプラインはデータソースとターゲットデータセットを接続する有向非巡回グラフ(DAG)です。SQLクエリー、あるいはSpark SQL、Koalasデータフレームを返却するPython関数でDelta Live Tablesのコンテンツを定義します。また、パイプラインは、パイプラインを実行するのに必要なセッティングを有しています。データセットを定義する際に、オプションとしてデータ品質制約を定義できます。
本節では以下を説明します。
クエリー
クエリーは、データソースとターゲットデータセットを定義することでデータ変換を実装します。Delta Live TablesのクエリーはPythonあるいはSQLで実装できます。
エクスペクテーション
データセットのコンテンツに対するデータ品質を指定するためにエクスペクテーションを使用します。制約に合致しないレコードの追加を防ぐ従来のデータベースのCHECK
とは異なり、エクスペクテーションはデータ品質要件を満たさないデータを処理する際の柔軟性を提供します。この柔軟性によって、汚いと予想するデータや厳密な品質要件に合致すべきデータを処理、蓄積することができます。
検証に失敗したレコードを保持するか、レコードを削除するか、パイプラインを停止するようにエクスペクテーションを定義できます。
パイプラインのセッティング
パイプラインのセッティングはJSONで定義され、パイプラインの実行に必要な以下のパラメータを含みます:
- Delta Lake形式のターゲットデータセットを作成するテーブルとビューを記述したクエリーを含むライブラリ(ノートブック形式)
- 処理に必要なテーブルとメタデータが格納されているクラウドストレージ上のロケーション。DBFSあるいは他の場所を指定できます。
- データ処理が行われるSparkクラスターのオプション設定
詳細はDelta Live Tablesの設定を参照してください。
パイプラインのアップデート
パイプライン作成後は実行することができます。updateを実行します。アップデートは:
- 適切な設定でクラスターを起動します。
- 定義された全てのテーブル、ビューを発見し、不適切なカラム名、依存関係の欠如、文法エラーなどのエラーをチェックします。
- 最新のデータを用いて全てのテーブル、ビューを作成あるいは更新します。
パイプラインがトリガーされた場合には、パイプライン上の全てのテーブルの更新が完了すれば処理を停止します。
トリガーされたアップデートが成功した場合には、アップデート開始時点のデータに基づいてそれぞれのテーブルが更新されたことが保証されます。
低レーテンシーが求められるユースケースにおいては、連続でパイプラインをアップデートするように設定できます。
パイプラインの実行モードの選択に関する詳細は連続パイプライン、トリガーパイプラインを参照ください。
データセット
Delta Live Tablesパイプラインにおけるデータセットには2つのタイプが存在します: ビューとテーブルです。
- ビューはSQLにおける一時ビューと同様のものであり、何かしらの計算のエイリアスです。ビューを用いることで、複雑なクエリーを小さい、あるいは理解しやすいクエリーに分割することができます。ビューによって、既存の変換処理を一つ以上のテーブルに対するソースとして再利用できます。ビューはパイプライン内でのみ有効であり、インタラクティブにクエリーを発行することはできません。
- テーブルは従来のマテリアライズドビューと同様のものです。Delta Live Tablesランタイムは自動でテーブルをDeltaフォーマットで作成し、テーブルを作成するクエリーの最新の結果に基づいてこれらのテーブルが更新されることを保証します。
テーブルはインクリメンタルテーブルあるいは完全テーブルとなります。インクリメンタルテーブルは、テーブル全体を再計算することなしに継続的に到着するデータに基づくアップデートをサポートしています。完全テーブルはそれぞれのアップデートで全体が再計算されます。
後段の利用者が参照、検索できるように、テーブルを公開することができます。
一時テーブル
外部での利用を想定しない中間テーブルを公開しないように、一時テーブルにはTEMPORARY
を付与します。
CREATE TEMPORARY LIVE TABLE temp_table
AS SELECT...
連続パイプライン、トリガーパイプライン
Delta Live Tablesは2つの実行モードを備えています:
- トリガーパイプラインは、実行時点で利用可能なデータに基づいてそれぞれのテーブルをアップデートし、パイプラインを実行していたクラスターを停止します。Delta Live Tablesは自動でテーブル間の依存関係を解析し、外部ソースから読み込む依存関係を処理してパイプラインを開始します。パイプライン内のテーブルは依存するデータソースが更新された後に更新されます。
- 連続パイプラインは入力データの変化に合わせて、連続的にテーブルを更新します。アップデートが開始したあとは手動で止めない限り動き続けます。連続パイプラインには常時稼働のクラスターが必要となりますが、後段の利用者は最新のデータを確実に利用できます。
トリガーパイプラインは、パイプラインを実行している間のみクラスターが稼働するので、リソース消費と費用を軽減することができます。しかし、パイプラインがトリガーされない限り新たなデータは処理されません。連続パイプラインには常時稼働のクラスターが必要となり、より多くの費用が発生しますが、処理のレーテンシーは改善されます。
パイプラインのセッティングにおけるcontinuous
フラグで実行モードを制御します。デフォルトではパイプラインはトリガーモードで動作します。 パイプラインにおいて、低いレーテンシーでのテーブルアップデートが必要な場合には、continuous
をtrue
に設定します。
{
...
"continuous": “true”,
...
}
実行モードは処理されるテーブルのタイプに関係がありません。完全、インクリメンタルテーブルの両方をそれぞれの実行モードでアップデートすることができます。
パイプラインにおいてレーテンシーの要件が重要でないテーブルがある場合には、pipelines.trigger.interval
でアップデートの頻度を設定できます:
spark_conf={"pipelines.trigger.interval", "1 hour"}
このオプションで、パイプラインのアップデートの合間でクラスターをオフにすることはできませんが、パイプラインの他のテーブルをアップデートするためのリソースを解放することができます。
連続パイプラインにおける完全テーブル
連続して動作するパイプラインに完全テーブルを含めることは可能です。不要な処理を避けるために、自動でパイプラインは依存するDeltaテーブルをモニタリングし、依存するテーブルのコンテンツが変更された場合にのみアップデートを実行します。
Delta Live Tablesランタイムは非Deltaデータソースの変化を検知することはできません。テーブルは定期的にアップデートされますが、過剰な再計算によってクラスターで実行されるインクリメンタル処理がスローダウンするのを防ぐために、デフォルトトリガーの実行間隔を長めにするようにしてください。
開発、プロダクションモード
開発モード、プロダクションモードを切り替えることでパイプライン実行を最適化することができます。開発モードでパイプラインを実行した際、Delta Live Tablesのシステムは:
- 再起動のオーバーヘッドを回避するためにクラスターを再利用します。
- エラーを検知してすぐに修正できるように、パイプラインのリトライを無効化します。
プロダクションモードで、Delta Live Tablesのシステムは:
- メモリーリーク、認証情報の陳腐化など特定の回復可能なエラーに対してはクラスターを再起動します。
- クラスター起動の失敗など特定のエラーが発生した場合にはリトライを行います。
PipelinesのUIにあるボタンを用いて、開発モードとプロダクションモードを切り替えます。デフォルトはプロダクションモードとなっています。パイプライン設定の一部としてStorage configurationを設定する必要がありますが、これらのモードの変更に影響は受けません。
パイプラインのアクセス権
オプションとしてパイプラインのアクセス権を設定できます。Pipeline Detailsページのボタンをクリックします。ユーザー、グループ、サービスプリンシパルに対して、Can View権限を許可することができます。パイプラインの参照アクセス権を許可するためには、パイプラインのオーナー、あるいはマネージャーである必要があります。
テーブルの公開
パイプラインの出力データを発見可能にし、クエリーを実行できるようにするために、Databricksのメタストアにデータセットを公開することができます。メタストアにデータセットを公開するには、Delta Live Tablesのセッティングにtarget
データベースを指定します。
{
...
"target": "prod_customer_data"
...
}
target
オプションを設定した際には、Delta Live Tablesランタイムはパイプライン上の全てのテーブルと関連するメタデータを公開します。この例では、prod_customer_data.sales
の名前を用いて、sales
テーブルにクエリーを発行することができます。
複数の環境設定を用いることで、環境に基づいて異なるデータベースに公開することが可能です。例えば、dev
データベースを開発データ、prod
データベースをプロダクションデータに公開できます。
target
設定を作成する際、テーブルと関連するメタデータのみが公開されます。ビューはメタストアに公開されません。
パイプラインの実装
ノートブック
DatabricksノートブックでDelta Live Tablesのパイプラインを実装します。一つのノートブックでも複数のノートブックでもパイプラインを実装できます。一つのノートブック上のすべてのクエリーはPyronかSQLで実装される必要がありますが、複数のノートブックパイプラインでは、PythonとSQLのノートブックを混在させることができます。それぞれのノートブックは、アウトプットデータのストレージロケーションを共有し、パイプライン上の他のノートブックからデータセットを参照することができます。
パイプラインの作成、実行方法に関してはDelta Live Tablesクイックスタートを参照ください。複数のノートブックによるパイプラインのサンプルに関してはConfigure multiple notebooks in a pipelineを参照ください。
クエリー
Delta Live TablesにおけるクエリーはPythonあるいはSQLで実装できます。サポートする言語に関してはDelta Live Tables language referenceを参照ください。
Python
Pythonでビュー、テーブルを定義する関数に@view
、@table
デコレータを適用します。テーブルやビューの名前をつけるために、関数名あるいはname
パラメータを使用できます。以下の例では、2つの異なるデータセットを定義します: 入力ソースとしてJSONファイルを受け取るtaxi_raw
ビュー、taxi_raw
ビューを入力とするfiltered_data
テーブルです。
@dlt.view
def taxi_raw():
return spark.read.json("/databricks-datasets/nyctaxi/sample/json/")
# Use the function name as the table name
@dlt.table
def filtered_data():
return dlt.read("taxi_raw").where(...)
# Use the name parameter as the table name
@dlt.table(
name="filtered_data")
def create_filtered_data():
return dlt.read("taxi_raw").where(...)
ビュー、テーブル関数はSparkデータフレームあるいはKoalasデータフレームを返却する必要があります。関数によって返却されるKoalasデータフレームは、Delta Live TablesランタイムによってSparkデータセットに変換されます。
外部データソースから読み込むことに加え、パイプラインで定義された他のデータセットにアクセスするためにDelta Live Tablesのread()
関数を使用することができます。以下の例では、read()
を用いてcustomers_filtered
データセットを作成しています。
@dlt.table
def customers_raw():
return spark.read.csv("/data/customers.csv")
@dlt.table
def customers_filteredA():
return dlt.read("customers_raw").where(...)
同じパイプラインで定義されたデータセット、あるいはメタストアに登録されているテーブルにアクセスするためにspark.table()
関数を使用することができます。パイプラインで定義されているデータセットにアクセする際にspark.table()
を使用する際には、関数の引数においてデータセット名の頭にLIVE
キーワードを追加してください。
@dlt.table
def customers_raw():
return spark.read.csv("/data/customers.csv")
@dlt.table
def customers_filteredB():
return spark.table("LIVE.customers_raw").where(...)
メタストアに登録されているテーブルからデータを読み込むためには、関数の引数からLIVE
キーワードを削除し、オプションとしてデータベース名を伴うテーブル名を指定します。
@dlt.table
def customers():
return spark.table("sales.customers").where(...)
Delta Live Tablesはパイプラインがデータセット間の依存関係を自動的にキャプチャすることを保証します。アップデート実行時、パイプラインのイベントログにおけるリネージュ情報を記録する際、実行順序を決定するために、この依存関係の情報が使用されます。
また、クエリー関数でspark.sql
を用いることでデータセットを返却することができます。内部データセットから読み込みを行うためには、データセット名にLIVE
キーワードを先頭に追加します。
@dlt.table
def chicago_customers():
return spark.sql("SELECT * FROM LIVE.customers_cleaned WHERE city = 'Chicago'")
ビューとテーブルには以下のオプションプロパティがあります:
-
comment
: データセットに対する説明 -
spark_conf
: このクエリー実行のみ有効なSpark設定を含むPythonディクショナリー - エクスペクテーションで強制されるデータ品質制約
テーブルにおいては、マテリアライズにおける追加のオプションがあります:
-
partition_cols
を用いてテーブルがどのようにパーティションされるのかを指定できます。 - ビュー、テーブルを定義する際にテーブルプロパティを設定できます。詳細はTable propertiesを参照ください。
-
path
が指定されない場合、デフォルトではテーブルデータはパイプラインのストレージロケーションに格納されます。path
を指定することで、別のストレージロケーションを設定できます。 - オプションとして、Pythonの
StructType
あるいは、SQLのDDL文字列を用いて、テーブルのスキーマを定義することができます。以下の例では、明示的にスキーマを指定してsales
テーブルを作成しています。
sales_schema = StructType([
StructField("customer_id", StringType(), True),
StructField("number_of_line_items", StringType(), True),
StructField("order_datetime", StringType(), True),
StructField("order_number", LongType(), True)]
)
@dlt.table(
comment="Raw data on sales",
schema=sales_schema)
def sales():
return ("...")
@dlt.table(
comment="Raw data on sales",
schema="customer_id STRING, customer_name STRING, number_of_line_items STRING, order_datetime STRING, order_number LONG")
def sales():
return ("...")
デフォルトでは、スキーマを指定しない場合、Delta Live Tablesはtable
定義からスキーマを推定します。
Python APIはdlt
モジュールで定義されます。Python APIとデータセットプロパティの詳細に関しては、Delta Live Tables言語リファレンスのPythonを参照ください。
Delta Live TablesのPythonノートブックで、外部Pythonライブラリを使用できます。ノートブックスコープPythonライブラリを参照ください。
SQL
SQLでビューやテーブルを作成するには、CREATE LIVE VIEW
、CREATE LIVE TABLE
文を使用します。外部のデータソース、パイプラインで定義されたデータセットから読み込むことでデータセットを作成できます。内部データセットから読み込むデータセット名の前にLIVE
キーワードを追加します。以下の例では、2つの異なるデータセットを定義します: 入力ソースとしてJSONファイルを受け取るtaxi_raw
ビュー、taxi_raw
ビューを入力とするfiltered_data
テーブルです。
CREATE LIVE TABLE taxi_raw
AS SELECT * FROM json.`/databricks-datasets/nyctaxi/sample/json/`
CREATE LIVE TABLE filtered_data
AS SELECT
...
FROM LIVE.taxi_raw
Delta Live Tablesはパイプラインで定義されるデータセット間の依存関係を自動でキャプチャします。アップデート実行時、パイプラインのイベントログにおけるリネージュ情報を記録する際、実行順序を決定するために、この依存関係の情報が使用されます。
ビューとテーブルには以下のオプションプロパティがあります:
-
COMMENT
: データセットに対する説明 - エクスペクテーションで強制されるデータ品質制約
テーブルにおいては、マテリアライズにおける追加のオプションがあります:
-
PARTITIONED BY
を用いてテーブルがどのようにパーティションされるのかを指定できます。パーティショニングによってクエリーが高速になります。 -
TBLPROPERTIES
を用いて、ビュー、テーブルを定義する際にテーブルプロパティを設定できます。詳細はTable propertiesを参照ください。 -
LOCATION
が指定されない場合、デフォルトではテーブルデータはパイプラインのストレージロケーションに格納されます。LOCATION
を指定することで、別のストレージロケーションを設定できます。
テーブルとビューのプロパティの詳細に関しては、Delta Live Tables言語リファレンスのSQLを参照ください。
テーブル、ビューにSpark設定のような設定値を適用したい場合にはSET
を使用します。ノートブックにおいてSET
文が定義された後、あらゆるテーブル、ビューは定義された値にアクセスすることができます。SET
文を用いて指定されたあらゆるSpark設定は、SET文以降のテーブル、ビューに対してSparkクエリーを実行する際に使用されます。クエリーで設定値を読み取るには、内挿文法${}
を使用します。以下の例では、startDate
というSpark設定を定義し、クエリーで使用しています。
SET startDate='2020-01-01';
CREATE LIVE TABLE filtered
AS SELECT * FROM src
WHERE date > ${startDate}
複数の設定値を指定するには、値ごとに別のSET
文を使用します。
外部データソース
データセットを作成するために以下の外部データソースを使用できます。
- Databricksランタイムが直接サポートする任意のデータソース
- Azure Data Lake Storage Gen2 (ADLS Gen2)、AWS S3、Google Cloud Storage (GCS)のようなクラウドストレージ内の任意のファイル
- DBFSに格納されている任意のファイル
サポートされているファイルフォーマットを読み込む際、特に継続的に到着するデータを操作するインクリメンタルテーブルの場合には、Auto Loaderを使用することをお勧めします。Auto Loaderはスケーラブル、効率的であり、スキーマの推定をサポートしています。
Pythonデータセットでは、Auto Loaderでサポートされていないファイルフォーマットからバッチ処理でデータを読み込むために、Apache Sparkにビルトインされているファイルソースを使用することができます。
SQLデータセットでは、Auto LoaderAuto Loaderでサポートされていないファイルフォーマットからバッチ処理でデータを読み込むために、Delta Live Tablesのファイルソースを使用することができます。
Auto Loader
以下の例ではCSV、JSONファイルからデータセットを作成するためにAuto Loaderを使用しています。
@dlt.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/databricks-datasets/retail-org/customers/")
)
@dlt.table
def sales_orders_raw():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders/")
)
CREATE INCREMENTAL LIVE TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv")
CREATE INCREMENTAL LIVE TABLE sales_orders_raw
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/sales_orders/", "json")
Auto Loaderでサポートされているフォーマットオプションを使用することができます。以下の例では、タブ区切りのCSVファイルからデータを読み込んでいます。
CREATE INCREMENTAL LIVE TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv", map("cloudFiles.delimiter", "\t"))
Apache Sparkファイルソース
Pythonで、定義されているデータセットからバッチ処理でファイルを読み込むために、標準的なPySpark関数を使用することができます。以下の例では、PySparkのspark.read.parquet()
関数を用いて、Parquetデータをファイルから読み込んでいます。
@dlt.table
def lendingclub_raw_data():
return (
spark.read.parquet("/databricks-datasets/samples/lending_club/parquet/")
)
Spark SQLファイルソース
SQLで、定義されているデータセットからバッチ処理でファイルを読み込むために、Spark SQL文法を使用することができます。
CREATE LIVE TABLE customers
AS SELECT * FROM parquet.`/databricks-datasets/samples/lending_club/parquet/`
データ品質制約の定義
データセットのコンテンツにおけるデータ品質制約を定義するためにエクスペクテーションを使用します。エクスペクテーションは、説明、定数、検証に失敗した際のアクションから構成されます。エクスペクテーションは、Pythonデコレータ、SQLのconstraint句を用いてクエリーに対して適用します。
単一のデータ品質制約を定義するために、Python、SQLクエリーを用いて、expect
、expect or drop
、expect or fail
を使用します。
Pythonパイプラインにおいては、@expect_all
、@expect_all_or_drop
、@expect_all_or_fail
デコレータを用いて、一つ以上のデータ品質制約によるエクスペクテーションを定義できます。
不正レコードを保持
エクスペクテーションに違反したレコードを保持したい場合には、expect
オペレーターを使用します。エクスペクテーションに違反したレコードは、適切なレコードともにターゲットデータセットに追加されます。エクスペクテーションに違反したレコードの数はターゲットデータセットのデータ品質メトリクスで参照できます。
@dlt.expect("valid timestamp", "col(“timestamp”) > '2012-01-01'")
CONSTRAINT valid_timestamp EXPECT (timestamp > '2012-01-01')
不正レコードを削除
不正レコードの処理を避けるためには、expect or drop
オペレーターを使用します。エクスペクテーションに違反したレコードはターゲットデータセットから削除されます。エクスペクテーションに違反したレコードの数はターゲットデータセットのデータ品質メトリクスで参照できます。
@dlt.expect_or_drop("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")
CONSTRAINT valid_current_page EXPECT (current_page_id IS NOT NULL and current_page_title IS NOT NULL) ON VIOLATION DROP ROW
不正レコードによる停止
不正レコードを許容できない場合には、レコード検証に失敗した際に直ちに実行を停止するexpect or fail
オペレーターを使用します。テーブルアップデートのオペレーションの場合、システムは自動でトランザクションをロールバックします。
@dlt.expect_or_fail("valid_count", "count > 0")
CONSTRAINT valid_count EXPECT (count > 0) ON VIOLATION FAIL UPDATE
エクスペクテーション違反によってパイプラインが失敗した場合、パイプラインを再実行する前に不正なデータを適切に取り扱えるようにパイプラインのコードを修正する必要があります。
停止エクスペクテーションは、違反時の検知、レポートに必要な情報を追跡できるように、変換処理のSparkクエリープランを修正します。多くのクエリーにおいては、どの入力レコードが違反したのかを特定するためにこの情報を利用できます。例外情報のサンプルを以下に示します:
Expectation Violated:
{
"flowName": "a-b",
"verboseInfo": {
"expectationsViolated": [
"x1 is negative"
],
"inputData": {
"a": {"x1": 1,"y1": "a },
"b": {
"x2": 1,
"y2": "aa"
}
},
"outputRecord": {
"x1": 1,
"y1": "a",
"x2": 1,
"y2": "aa"
},
"missingInputData": false
}
}
複数のエクスペクテーション
検証に失敗したレコードをターゲットデータセットに含める複数のデータ品質制約を指定するためにはexpect_all
を使用します。
@dlt.expect_all({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})
検証に失敗したレコードをターゲットデータセットから削除する複数のデータ品質制約を指定するためにはexpect_all_or_drop
を使用します。
@dlt.expect_all_or_drop({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})
検証に失敗した際に処理を停止する複数のデータ品質制約を指定するためにはexpect_all_or_fail
を使用します。
@dlt.expect_all_or_fail({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})
また、制約を変数として定義して、パイプライン上の他のクエリーに渡すことも可能です。
valid_pages = {"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"}
@expect_all(valid_pages)
@table
def raw_data():
# Create raw dataset
@expect_all_or_drop(valid_pages)
@table
def prepared_data():
# Create cleaned and prepared dataset
Delta Live Tablesによる開発ワークフローの構築
開発、ステージング、プロダクションに対して、Delta Live Tablesの別々の環境を作成できます。これによって、パイプラインを本格運用する前に変換ロジックをテスト、検証することができます。別々の環境を使用する際には、異なるデータベースにつながる別々のパイプラインを作成しますが、同じコードを使用します。
完全に分離された開発環境と、開発からプロダクションに移行するためのシンプルなワークフローを作成するために、この機能とDatabricks Reposの機能を組み合わせることができます。
{
"name": "Data Ingest - DEV user@databricks",
"target": "customers_dev_user",
"libraries": ["/Repos/user@databricks.com/ingestion/etl.py"],
}
{
"name": "Data Ingest - PROD",
"target": "customers",
"libraries": ["/Repos/production/ingestion/etl.py"],
}
パイプラインのパラメータ化
パイプラインの設定で、データセットを定義するPython、SQLコードをパラメータ化することができます。以下のユースケースでパラメータ化を活用することができます。
- コードの変数と長いパスを分離する
- 迅速にテスト行うために、開発、ステージング環境で処理されるデータ量を削減する
- 複数のデータソースに対して処理を行う同じ変換ロジックの再利用
以下の例では、開発パイプラインの入力データのサブセットを限定するために、startDate
設定を使用しています。
CREATE LIVE TABLE customer_events
AS SELECT * FROM sourceTable WHERE date > ${mypipeline.startDate};
@dlt.table
def customer_events():
start_date = spark.conf.get("mypipeline.startDate")
return read("sourceTable").where(col("date") > start_date)
{
"name": "Data Ingest - DEV",
"configuration": {
"mypipeline.startDate": "2021-01-02"
}
}
{
"name": "Data Ingest - PROD",
"configuration": {
"mypipeline.startDate": "2010-01-02"
}
}
インクリメンタルなデータ処理
多くのアプリケーションは、継続的に入力されるデータに基づいてアップデートされるテーブルを必要とします。しかし、データサイズが増加すると、それぞれのアップデートによってデータを再処理するのに必要なリソースが法外なものになります。Delta Live Tablesは、新規データを登録するコストと、新たなデータが利用可能になるまでのレーテンシーを削減するためのインクリメンタルな処理をサポートしています。
パイプラインのアップデートがトリガーされた際、インクリメンタルなテーブルは、前回のアップデート以降に到着した新規データのみを処理します。処理済みのデータはDelta Live Tablesランタイムによって追跡されます。
外部データソースによるインクリメンタルデータセット
一つ以上の入力をストリームとしてクエリーに読み込むことでインクリメンタルデータセットを定義できます。
inputPath = "/databricks-datasets/structured-streaming/events/"
@dlt.table
def streaming_bronze_table():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load(inputPath)
CREATE INCREMENTAL LIVE TABLE streaming_bronze_table
AS SELECT * FROM cloud_files("/databricks-datasets/structured-streaming/events/", "json")
パイプライン内でのインクリメンタルデータセット
パイプラインの他のデータセットからインクリメンタルに読み込むことができます。
@dlt.table
def streaming_silver_table:
return dlt.read_stream("streaming_bronze_table").where(...)
CREATE INCREMENTAL LIVE TABLE streaming_silver_table
AS SELECT
*
FROM
STREAM(LIVE.streaming_bronze_table)
WHERE ...
フルリフレッシュの実行
例えば、新たなカラムを計算することでバグを修正したり、新たな要件に基づいてクエリーを変更したりして、既に登録されたデータを再度処理したい場合があるかもしれません。UIからフルリフレッシュを実行することをDelta Live Tablesに指示することで、既に登録されたデータを再処理することができます。
完全テーブルとインクリメンタルテーブルの混在
一つのパイプラインに異なるタイプのテーブルを混在させることができます。例えば、パイプラインの最初のデータセット、一般的にブロンズテーブルと呼ばれます、は多くの場合シンプルな変換処理を実行します。JSONなど効率的ではないフォーマットの再処理は、シンプルな変換であっても法外なものになる場合があるため、インクリメンタルテーブルが最適な選択肢と言えます。
逆に、パイプラインの最後のテーブル、一般的にゴールドテーブルと呼ばれます、は多くの場合、Sparkの構造化ストリーミングでサポートされていないような複雑な集計処理が必要となります。これらの変換処理は完全テーブルとしてマテリアライズした方が望ましいと言えます。
2つのタイプのテーブルを一つのパイプラインに混在させることで、コストの高いデータの再投入や、生データの再処理を避けることができ、効率的にエンコード、フィルタリングされたデータセットに対して、Spark SQLのパワーを最大限に活用した複雑な集計処理を行うことが可能となります。以下の例では、このタイプの混合処理を示しています。
@dlt.view
def incremental_bronze():
return (
# Since this is a streaming source, this table is incremental.
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("s3://path/to/raw/data")
)
@dlt.table
def incremental_silver():
# Since we read the bronze table as a stream, this silver table is also
# updated incrementally.
return dlt.read_stream("incremental_bronze").where(...)
@dlt.table
def complete_gold():
# This table will be recomputed completely by reading the whole silver table
# when it is updated.
return dlt.read("incremental_silver").groupBy("user_id").count()
CREATE INCREMENTAL LIVE VIEW incremental_bronze
AS SELECT * FROM cloud_files(
"s3://path/to/raw/data", "json")
CREATE LIVE TABLE incremental_silver
AS SELECT * FROM LIVE.incremental_bronze WHERE...
CREATE LIVE TABLE complete_gold
AS SELECT count(*) FROM LIVE.incremental_silver GROUP BY user_id
インクリメンタル処理のためにS3からJSONを効率的に読み込むためのオートローダーに関しては、Load files from S3 using Auto Loaderを参照ください。
インクリメンタルなJOIN
Delta Live Tablesはテーブルをアップデートするための様々な戦略をサポートしています。
ストリーム、バッチのJOIN
ストリーム、バッチのJOINは、静的ディメンジョンテーブルと、連続するストリームデータを非正規化する際には適切な選択肢と言えます。派生データセットがアップデートされる都度、ストリームからの新たなレコードは、アップデートが開始した時点のバッチテーブルの静的なスナップショットとJOINされます。静的テーブルで追加、更新されたレコードは、フルリフレッシュが実行されるまではテーブルに反映されません。
ストリーム、バッチJOINの例を以下に示します。
@dlt.table
def customer_sales():
return dlt.read_stream("sales").join(read("customers"), ["customer_id"], "left")
CREATE INCREMENTAL LIVE TABLE customer_sales
AS SELECT * FROM STREAM(LIVE.sales)
INNER JOIN LEFT READ LIVE.customers ON customer_id
連続パイプラインでは、マイクロバッチごとのアップデートから定期的にJOINのバッチ側が取得されます。
インクリメンタルな集計
count、min、max、sumのような分配集計と、平均、標準偏差のような代数集計はインクリメンタルに計算することができます。限られた数のグループ、例えば、GROUP BY country
のようなクエリーに対する集計はインクリメンタルに行うことをお勧めします。
それぞれのアップデートで、新規入力データのみが読み込まれますが、背後のDeltaテーブルは完全に上書きされます。