0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Delta Live Tables Python language reference | Databricks on AWS [2022/3/2時点]の翻訳です。

本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。

プレビュー
本機能はパブリックプレビューです。サインアップにはDelta Live Tablesへのアクセスをリクエストしてください。

本書ではDelta Live TablesのPythonプログラミングインタフェースの詳細とサンプルを説明します。完全なAPI仕様についてはPython API仕様を参照ください。

SQL APIに関してはDelta Live Tables SQL language referenceを参照ください。

Pythonデータセット

Python APIはdltモジュールで定義されています。Python APIで実装されているDelta Live Tablseパイプラインでdltモジュールをインポートする必要があります。Pythonでビューあるいはテーブルを定義するには関数に対して@dlt.viewあるいは@dlt.tableデコレーターを適用します。テーブル名、ビュー名を指定するには関数名あるいはnameパラメーターを使用できます。以下の例では二つの異なるデータセットを定義しています。入力ソースとしてJSONファイルを受け取るtaxi_rawというビューと、taxi_rawビューを入力とするfiltered_dataというテーブルです。

Python
@dlt.view
def taxi_raw():
  return spark.read.json("/databricks-datasets/nyctaxi/sample/json/")

# Use the function name as the table name
@dlt.table
def filtered_data():
  return dlt.read("taxi_raw").where(...)

# Use the name parameter as the table name
@dlt.table(
  name="filtered_data")
def create_filtered_data():
  return dlt.read("taxi_raw").where(...)

ビューとテーブルは、SparkデータフレームかKoalasデータフレームを返却する必要があります。関数によって返却されるKoalasデータフレームは、Delta Live TablesランタイムによってSparkデータセットに変換されます。

外部データソースからの読み込みに加え、Delta Live Tablseのread()関数を用いて、同じパイプラインで定義されたデータセットにアクセスすることができます。以下の例では、read()関数によるcustomers_filteredの作成をデモします。

Python
@dlt.table
def customers_raw():
  return spark.read.csv("/data/customers.csv")

@dlt.table
def customers_filteredA():
  return dlt.read("customers_raw").where(...)

同じパイプラインで定義されたデータセット、あるいは、メタストアに登録されたテーブルにアクセスするためにspark.table()を使用することもできます。パイプラインに定義されているデータセットにアクセスするためにspark.table()を用いる際、関数の引数において、データセット名の先頭にLIVEを追加します。

Python
@dlt.table
def customers_raw():
  return spark.read.csv("/data/customers.csv")

@dlt.table
def customers_filteredB():
  return spark.table("LIVE.customers_raw").where(...)

メタストアに登録されているテーブルからデータを読み込むには、関数の引数からLIVEキーワードを削除し、データベース名(オプション)を伴うテーブル名を指定します。

Python
@dlt.table
def customers():
  return spark.table("sales.customers").where(...)

Delta Live Tablesは、パイプラインが自動でデータセット間の依存関係を捕捉することを保証します。この依存関係情報は、アップデートを実行する際の実行順序を決定し、パイプラインのイベントログにリネージュ情報を記録するために用いられます。

クエリー関数でspark.sql表現を用いてデータセットを返却することができます。内部データセットから読み込みを行うには、データセット名にLIVE.を追加します。

Python
@dlt.table
def chicago_customers():
  return spark.sql("SELECT * FROM LIVE.customers_cleaned WHERE city = 'Chicago'")

ビューとテーブルの両方には、以下のオプションのプロパティが存在します。

  • comment: 人間が理解できるデータセットの説明。
  • spark_conf: 当該クエリーの実行にのみ適用されるSpark設定を含むPythonのディクショナリー。
  • エクスペクテーションによって強制されるデータ品質制約。

テーブルでは、自身のマテリアライゼーションに対する追加の制御手段を提供しています。

  • partition_colsを用いて、どのようにテーブルのパーティションを作成するのかを指定します。クエリーの性能を改善するためにパーティショニングを活用することができます。
  • ビュー、テーブルを定義する際にテーブルプロパティを設定することができます。詳細はテーブルプロパティを参照ください。
  • pathセッティングを用いることでテーブルデータの格納場所を設定します。デフォルトでは、pathが設定されていない場合、テーブルデータはパイプラインの格納場所に保存されます。
  • PythonのStructTypeあるいはSQL DDL文字列を用いて、テーブルスキーマをオプションとして指定することができます。以下の例では、明示的に指定したスキーマを用いてsalesというテーブルを作成します。
Python
sales_schema = StructType([
  StructField("customer_id", StringType(), True),
  StructField("number_of_line_items", StringType(), True),
  StructField("order_datetime", StringType(), True),
  StructField("order_number", LongType(), True)]
)

@dlt.table(
  comment="Raw data on sales",
  schema=sales_schema)
def sales():
  return ("...")

@dlt.table(
  comment="Raw data on sales",
  schema="customer_id STRING, customer_name STRING, number_of_line_items STRING, order_datetime STRING, order_number LONG")
def sales():
  return ("...")

デフォルトでは、Delta Live Tablesはスキーマを指定しない場合、table定義からスキーマを推定します。

Pythonライブラリ

外部のPythonライブラリを指定するには、%pip installマジックコマンドを使用します。アップデートがスタートすると、Delta Live Tablesはいかなるテーブル定義を実行する前に、%pip installコマンドを含む全てのセルを実行します。パイプラインに含まれる全てのPythonノートブックは、インストールされた全てのライブラリにアクセスできます。以下の例では、パイプラインの全てのPythonノートブックから利用できるloggerというパッケージをインストールしています。

Python
%pip install logger

from logger import log_info

@dlt.table
def dataset():
    log_info(...)
    return dlt.read(..)

Python API仕様

Pythonモジュール

Delta Live TablesのPython関数はdltモジュールで定義されています。Python APIを用いて実装されたパイプラインでは、このモジュールをインポートする必要があります。

Python
import dlt

テーブルの作成

Pythonでテーブルを定義するには、@tableデコレーターを適用します。@tableデコレーターは@create_tableデコレーターのエイリアスです。

Python
import dlt

@dlt.table(
  name="<name>",
  comment="<comment>",
  spark_conf={"<key>" : "<value", "<key" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  path="<storage-location-path>",
  partition_cols=["<partition-column>", "<partition-column>"],
  schema="schema-definition",
  temporary=False)
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
    return (<query>)

ビューの作成

Pythonでビューを定義するには、@viewデコレーターは@create_viewデコレーターのエイリアスです。

Python
import dlt

@dlt.view(
  name="<name>",
  comment="<comment>")
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
    return (<query>)

Pythonプロパティ

@tableあるいは@view
name
Type: str
テーブルかビューに対する名前(オプション)。指定されていない場合、関数名がテーブル、ビュー名として用いられます。
comment
Type: str
テーブルの説明(オプション)。
spark_conf
Type: dict
このクエリーの実行に対するSpark設定のリスト(オプション)。
table_properties
Type: dict
テーブルに対するテーブルプロパティのリスト(オプション)。
path
Type: str
テーブルデータの格納場所(オプション)。設定されない場合、システムはパイプラインの格納場所を使用します。
partition_cols
Type: array
テーブルのパーティションに使用する1つ以上のカラムのリスト(オプション)。
schema
Type: strStructType
テーブルに対するスキーマ定義(オプション)。SQL DDL文字列、PythonのStructTypeとして定義されるスキーマ。
temporary
Type: bool
一時テーブルの作成。このテーブルに対するメタデータは永続化されません。
テーブル、ビュー定義
def ()
データセットを定義するPython関数。nameパラメーターが設定されていない場合、<function-name>がターゲットのデータセット名となります。
query
Sparkデータセット、あるいはKoalasデータフレームを返却するSpark SQL文。

同じパイプラインで定義されるデータセットに対するcomplete読み込みを行うには、dlt.read()spark.table()を使用します。同じパイプラインで定義されているデータセットから読み込む際には、spark.table()関数の引数のデータセット名の先頭にLIVEキーワードを追加します。例えば、customersというデータセットから読み込む際には、

spark.table("LIVE.customers")

また、メタストアに登録されているテーブルから読み込むケースでspark.table()関数を使う際には、LIVEキーワードを削除して、データベース名(オプション)を伴うテーブル名を指定します。

spark.table("sales.customers")

同じパイプラインで定義されているデータセットからインクリメンタルにデータセットを読み込む際は、dlt.read_stream()を使用します。

返却するデータセットを作成するためにSQLクエリーを定義するにはspark.sql関数を使用します。

PythonによるDelta Live Tablesクエリーを定義するにはPySpark文法を使用します。
エクスペクテーション
@expect(“description”, “constraint”)

descriptionで識別されるデータ品質制約を宣言します。レコードがエクスペクテーションに違反した場合、ターゲットデータセットに当該レコードを含めます。
@expect_or_drop(“description”, “constraint”)

descriptionで識別されるデータ品質制約を宣言します。レコードがエクスペクテーションに違反した場合、ターゲットデータセットから当該レコードを削除します。
@expect_or_fail(“description”, “constraint”)

descriptionで識別されるデータ品質制約を宣言します。レコードがエクスペクテーションに違反した場合、即座に処理を停止します。
@expect_all(expectations)

1つ以上のデータ品質制約を宣言します。expectationsはPythonディクショナリーであり、キーがエクスペクテーションの名称となり、値がエクスペクテーションの制約となります。いずれかのエクスペクテーションに違反したレコードはターゲットデータセットに含まれます。
@expect_all_or_drop(expectations)

1つ以上のデータ品質制約を宣言します。expectationsはPythonディクショナリーであり、キーがエクスペクテーションの名称となり、値がエクスペクテーションの制約となります。いずれかのエクスペクテーションに違反したレコードはターゲットデータセットから削除されます。
@expect_all_or_fail(expectations)

1つ以上のデータ品質制約を宣言します。expectationsはPythonディクショナリーであり、キーがエクスペクテーションの名称となり、値がエクスペクテーションの制約となります。いずれかのエクスペクテーションにレコードが違反した場合、即座に処理を停止します。

テーブルプロパティ

Delta Lakeでサポートされているテーブルプロパティに加え、以下のテーブルプロパティを設定することができます。

テーブルプロパティ
pipelines.autoOptimize.managed
デフォルト値: true
当該デーブルに対する自動スケジュール最適化を有効化するか、無効化するかを指定します。
pipelines.autoOptimize.zOrderCols
デフォルト値: None
当該テーブルでz-orderを行うカラムをカンマ区切りで指定します(オプション)。
pipelines.reset.allowed
デフォルト値: true
当該テーブルに対するフルリフレッシュを許可するかどうかを制御します。
pipelines.trigger.interval
デフォルト値: フロータイプに依存します。
当該テーブルのアップデートを行うフローのトリガー周期を制御します。デフォルト値は以下のようになります。
  • ストリーミングクエリーは4秒
  • 全ての入力データがDeltaソースであり、completeクエリーの場合は1分
  • 非Deltaのデータソースが含まれており、completeクエリーの場合は10分。Complete tables in continuous pipelinesをご覧ください。
値は数値と時間ユニットになります。以下に適切な時間ユニットを示します。
  • second, seconds
  • minute, minutes
  • hour, hours
  • day, days
値を定義する際、以下のように、単数形、複数形のユニットを指定することができます。
  • {"pipelines.trigger.interval" : "1 hour"}
  • {"pipelines.trigger.interval" : "10 seconds"}
  • {"pipelines.trigger.interval" : "30 second"}
  • {"pipelines.trigger.interval" : "1 minute"}
  • {"pipelines.trigger.interval" : "10 minutes"}
  • {"pipelines.trigger.interval" : "10 minute"}

Databricks 無料トライアル

Databricks 無料トライアル

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?