Delta Live Tables Python language reference | Databricks on AWS [2022/3/2時点]の翻訳です。
本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。
プレビュー
本機能はパブリックプレビューです。サインアップにはDelta Live Tablesへのアクセスをリクエストしてください。
本書ではDelta Live TablesのPythonプログラミングインタフェースの詳細とサンプルを説明します。完全なAPI仕様についてはPython API仕様を参照ください。
SQL APIに関してはDelta Live Tables SQL language referenceを参照ください。
Pythonデータセット
Python APIはdlt
モジュールで定義されています。Python APIで実装されているDelta Live Tablseパイプラインでdlt
モジュールをインポートする必要があります。Pythonでビューあるいはテーブルを定義するには関数に対して@dlt.view
あるいは@dlt.table
デコレーターを適用します。テーブル名、ビュー名を指定するには関数名あるいはname
パラメーターを使用できます。以下の例では二つの異なるデータセットを定義しています。入力ソースとしてJSONファイルを受け取るtaxi_raw
というビューと、taxi_raw
ビューを入力とするfiltered_data
というテーブルです。
@dlt.view
def taxi_raw():
return spark.read.json("/databricks-datasets/nyctaxi/sample/json/")
# Use the function name as the table name
@dlt.table
def filtered_data():
return dlt.read("taxi_raw").where(...)
# Use the name parameter as the table name
@dlt.table(
name="filtered_data")
def create_filtered_data():
return dlt.read("taxi_raw").where(...)
ビューとテーブルは、SparkデータフレームかKoalasデータフレームを返却する必要があります。関数によって返却されるKoalasデータフレームは、Delta Live TablesランタイムによってSparkデータセットに変換されます。
外部データソースからの読み込みに加え、Delta Live Tablseのread()
関数を用いて、同じパイプラインで定義されたデータセットにアクセスすることができます。以下の例では、read()
関数によるcustomers_filtered
の作成をデモします。
@dlt.table
def customers_raw():
return spark.read.csv("/data/customers.csv")
@dlt.table
def customers_filteredA():
return dlt.read("customers_raw").where(...)
同じパイプラインで定義されたデータセット、あるいは、メタストアに登録されたテーブルにアクセスするためにspark.table()
を使用することもできます。パイプラインに定義されているデータセットにアクセスするためにspark.table()
を用いる際、関数の引数において、データセット名の先頭にLIVE
を追加します。
@dlt.table
def customers_raw():
return spark.read.csv("/data/customers.csv")
@dlt.table
def customers_filteredB():
return spark.table("LIVE.customers_raw").where(...)
メタストアに登録されているテーブルからデータを読み込むには、関数の引数からLIVE
キーワードを削除し、データベース名(オプション)を伴うテーブル名を指定します。
@dlt.table
def customers():
return spark.table("sales.customers").where(...)
Delta Live Tablesは、パイプラインが自動でデータセット間の依存関係を捕捉することを保証します。この依存関係情報は、アップデートを実行する際の実行順序を決定し、パイプラインのイベントログにリネージュ情報を記録するために用いられます。
クエリー関数でspark.sql
表現を用いてデータセットを返却することができます。内部データセットから読み込みを行うには、データセット名にLIVE.
を追加します。
@dlt.table
def chicago_customers():
return spark.sql("SELECT * FROM LIVE.customers_cleaned WHERE city = 'Chicago'")
ビューとテーブルの両方には、以下のオプションのプロパティが存在します。
-
comment
: 人間が理解できるデータセットの説明。 -
spark_conf
: 当該クエリーの実行にのみ適用されるSpark設定を含むPythonのディクショナリー。 - エクスペクテーションによって強制されるデータ品質制約。
テーブルでは、自身のマテリアライゼーションに対する追加の制御手段を提供しています。
-
partition_cols
を用いて、どのようにテーブルのパーティションを作成するのかを指定します。クエリーの性能を改善するためにパーティショニングを活用することができます。 - ビュー、テーブルを定義する際にテーブルプロパティを設定することができます。詳細はテーブルプロパティを参照ください。
-
path
セッティングを用いることでテーブルデータの格納場所を設定します。デフォルトでは、path
が設定されていない場合、テーブルデータはパイプラインの格納場所に保存されます。 - Pythonの
StructType
あるいはSQL DDL文字列を用いて、テーブルスキーマをオプションとして指定することができます。以下の例では、明示的に指定したスキーマを用いてsales
というテーブルを作成します。
sales_schema = StructType([
StructField("customer_id", StringType(), True),
StructField("number_of_line_items", StringType(), True),
StructField("order_datetime", StringType(), True),
StructField("order_number", LongType(), True)]
)
@dlt.table(
comment="Raw data on sales",
schema=sales_schema)
def sales():
return ("...")
@dlt.table(
comment="Raw data on sales",
schema="customer_id STRING, customer_name STRING, number_of_line_items STRING, order_datetime STRING, order_number LONG")
def sales():
return ("...")
デフォルトでは、Delta Live Tablesはスキーマを指定しない場合、table
定義からスキーマを推定します。
Pythonライブラリ
外部のPythonライブラリを指定するには、%pip install
マジックコマンドを使用します。アップデートがスタートすると、Delta Live Tablesはいかなるテーブル定義を実行する前に、%pip install
コマンドを含む全てのセルを実行します。パイプラインに含まれる全てのPythonノートブックは、インストールされた全てのライブラリにアクセスできます。以下の例では、パイプラインの全てのPythonノートブックから利用できるlogger
というパッケージをインストールしています。
%pip install logger
from logger import log_info
@dlt.table
def dataset():
log_info(...)
return dlt.read(..)
Python API仕様
Pythonモジュール
Delta Live TablesのPython関数はdlt
モジュールで定義されています。Python APIを用いて実装されたパイプラインでは、このモジュールをインポートする必要があります。
import dlt
テーブルの作成
Pythonでテーブルを定義するには、@table
デコレーターを適用します。@table
デコレーターは@create_table
デコレーターのエイリアスです。
import dlt
@dlt.table(
name="<name>",
comment="<comment>",
spark_conf={"<key>" : "<value", "<key" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
path="<storage-location-path>",
partition_cols=["<partition-column>", "<partition-column>"],
schema="schema-definition",
temporary=False)
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
return (<query>)
ビューの作成
Pythonでビューを定義するには、@view
デコレーターは@create_view
デコレーターのエイリアスです。
import dlt
@dlt.view(
name="<name>",
comment="<comment>")
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
return (<query>)
Pythonプロパティ
@table あるいは@view
|
---|
name Type: str テーブルかビューに対する名前(オプション)。指定されていない場合、関数名がテーブル、ビュー名として用いられます。 |
comment Type: str テーブルの説明(オプション)。 |
spark_conf Type: dict このクエリーの実行に対するSpark設定のリスト(オプション)。 |
table_properties Type: dict テーブルに対するテーブルプロパティのリスト(オプション)。 |
path Type: str テーブルデータの格納場所(オプション)。設定されない場合、システムはパイプラインの格納場所を使用します。 |
partition_cols Type: array テーブルのパーティションに使用する1つ以上のカラムのリスト(オプション)。 |
schema Type: str かStructType テーブルに対するスキーマ定義(オプション)。SQL DDL文字列、Pythonの StructType として定義されるスキーマ。 |
temporary Type: bool 一時テーブルの作成。このテーブルに対するメタデータは永続化されません。 |
テーブル、ビュー定義 |
---|
def () データセットを定義するPython関数。 name パラメーターが設定されていない場合、<function-name> がターゲットのデータセット名となります。 |
query Sparkデータセット、あるいはKoalasデータフレームを返却するSpark SQL文。 同じパイプラインで定義されるデータセットに対するcomplete読み込みを行うには、 dlt.read() かspark.table() を使用します。同じパイプラインで定義されているデータセットから読み込む際には、spark.table()関数の引数のデータセット名の先頭にLIVE キーワードを追加します。例えば、customers というデータセットから読み込む際には、spark.table("LIVE.customers") また、メタストアに登録されているテーブルから読み込むケースで spark.table() 関数を使う際には、LIVE キーワードを削除して、データベース名(オプション)を伴うテーブル名を指定します。spark.table("sales.customers") 同じパイプラインで定義されているデータセットからインクリメンタルにデータセットを読み込む際は、 dlt.read_stream() を使用します。返却するデータセットを作成するためにSQLクエリーを定義するには spark.sql 関数を使用します。PythonによるDelta Live Tablesクエリーを定義するにはPySpark文法を使用します。 |
エクスペクテーション |
---|
@expect(“description”, “constraint”) description で識別されるデータ品質制約を宣言します。レコードがエクスペクテーションに違反した場合、ターゲットデータセットに当該レコードを含めます。 |
@expect_or_drop(“description”, “constraint”) description で識別されるデータ品質制約を宣言します。レコードがエクスペクテーションに違反した場合、ターゲットデータセットから当該レコードを削除します。 |
@expect_or_fail(“description”, “constraint”) description で識別されるデータ品質制約を宣言します。レコードがエクスペクテーションに違反した場合、即座に処理を停止します。 |
@expect_all(expectations) 1つ以上のデータ品質制約を宣言します。 expectations はPythonディクショナリーであり、キーがエクスペクテーションの名称となり、値がエクスペクテーションの制約となります。いずれかのエクスペクテーションに違反したレコードはターゲットデータセットに含まれます。 |
@expect_all_or_drop(expectations) 1つ以上のデータ品質制約を宣言します。 expectations はPythonディクショナリーであり、キーがエクスペクテーションの名称となり、値がエクスペクテーションの制約となります。いずれかのエクスペクテーションに違反したレコードはターゲットデータセットから削除されます。 |
@expect_all_or_fail(expectations) 1つ以上のデータ品質制約を宣言します。 expectations はPythonディクショナリーであり、キーがエクスペクテーションの名称となり、値がエクスペクテーションの制約となります。いずれかのエクスペクテーションにレコードが違反した場合、即座に処理を停止します。 |
テーブルプロパティ
Delta Lakeでサポートされているテーブルプロパティに加え、以下のテーブルプロパティを設定することができます。
テーブルプロパティ |
---|
pipelines.autoOptimize.managed デフォルト値: true 当該デーブルに対する自動スケジュール最適化を有効化するか、無効化するかを指定します。 |
pipelines.autoOptimize.zOrderCols デフォルト値: None 当該テーブルでz-orderを行うカラムをカンマ区切りで指定します(オプション)。 |
pipelines.reset.allowed デフォルト値: true 当該テーブルに対するフルリフレッシュを許可するかどうかを制御します。 |
pipelines.trigger.interval デフォルト値: フロータイプに依存します。 当該テーブルのアップデートを行うフローのトリガー周期を制御します。デフォルト値は以下のようになります。
|