Delta Live Tables settings | Databricks on AWS [2023/2/24時点]の翻訳です。
本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。
Delta Live Tablesの設定では、パイプラインを実装する一つ以上のノートブック、開発、ステージング、プロダクション環境でパイプラインをどのように実行するのかを設定するパラメーターを指定します。Delta Live Tablesの設定はJSONで表現され、Delta Live TablesのUIで編集することができます。
設定
| フィールド |
|---|
|
id Type: stringグローバルでユニークなパイプラインのID。IDはシステムによって割り当てられ変更することはできません。 |
|
name Type: stringわかりやすいパイプラインの名称。UIでパイプラインジョブを識別するために使用されます。 |
|
storage Type: stringパイプラインの実行に必要な出力データ、メタデータを格納するDBFSあるいはクラウドストレージ。 storageが設定されない場合、システムがデフォルトのdbfs:/pipelines/を使用します。パイプラインの作成後に storageを変更することはできません。 |
|
configuration Type: object パイプラインを実行するクラスターのSpark設定を追加するためのオプションの設定リストです。これらの設定はDelta Live Tablesランタイムに読み込まれ、Spark設定を通じてパイプラインクエリーで使用されます。 各々の要素は key:valueの形式である必要があります。configurationの使用例については、パイプラインのパラメータ化を参照ください。 |
|
libraries Type: array of objectsパイプラインのコードと必要とされるアーティファクトを含むノートブックの配列を指定します。サンプルはパイプラインの複数のノートブックの設定を参照ください。 |
|
clusters Type: array of objectsパイプラインを実行するクラスターの設定の配列です。詳細はクラスター設定を参照ください。 |
|
development Type: booleanパイプラインを developmentあるいはproductionモードで実行するかを示すフラグです。開発、プロダクションモードをご覧ください。 |
|
continuous Type: booleanパイプラインを連続で稼働させるかどうかを示すフラグです。 デフォルトは falseです。 |
|
target Type: stringパイプラインの出力データを永続化するデータベース名です。 targetを設定する事で、DatabricksのUIからパイプラインの出力データを参照、検索することが可能となります。 |
|
channel Type: string使用するDelta Live Tablesランタイムのバージョンです。サポートされる値は:
channelフィールドはオプションです。デフォルトはcurrentです。プロダクションのワークロードではcurrentバージョンのランタイムを使用することをお勧めします。 |
|
edition Type: stringパイプラインを実行するDelta Live Tablesの製品エディションです。この設定によって、パイプラインの要件に応じたベストな製品エディションを選択することができます。
editionフィールドはオプションです。デフォルトはADVANCEDです。 |
|
photon Type: booleanパイプラインを実行するためにPhotonランタイムを使用するかどうかを指定するフラグです。PhotonはDatabricksの高性能Sparkエンジンです。Photonが有効化されたパイプラインの課金と、有効化されていないパイプラインの課金は異なります。 photonフィールドはオプションです。デフォルトはfalseです。 |
|
pipelines.maxFlowRetryAttempts Type: intリトライ可能な障害が発生した際、パイプラインのアップデートが失敗する前にフローのリトライを試みる最大数です。 デフォルト値は2です。デフォルトでは、リトライ可能な障害が発生した場合、Delta Live Tablesランタイムは、初回の試行を含み3回フローの実行を試みます。 |
|
pipelines.numUpdateRetryAttempts Type: intリトライ可能な障害が発生した際、パイプラインのアップデートが失敗する前にアップデートのリトライを試みる最大数です。リトライはフルアップデートとして実行されます。 デフォルト値は5です。このパラメーターはproductionモードのアップデート実行の際にのみ適用されます。パイプライン実行がdevelopmentモードの場合にはリトライは行われません。 |
パイプラインのトリガー間隔
テーブルをアップデートするフロー、あるいはパイプライン全体のトリガー間隔をコントロールするために、pipelines.trigger.intervalを活用することができます。起動されるパイプラインはそれぞれのテーブルを一度のみ処理するので、pipelines.trigger.intervalは連続パイプラインの場合にのみ使用されます。
ストリーミングクエリーとバッチクエリーとでは異なるデフォルト値があるため、個々のテーブルでpipelines.trigger.intervalを設定することをお勧めします。処理においてパイプラインのグラフ全体のアップデートをコントロールする必要がある場合にのみ、パイプラインにこの値を設定してください。
Pythonのspark_conf、SQLのSETを用いてテーブルにpipelines.trigger.intervalを設定します。
@dlt.table(
spark_conf={"pipelines.trigger.interval" : "10 seconds"}
)
def <function-name>():
return (<query>)
SET pipelines.trigger.interval='10 seconds';
CREATE OR REFRESH LIVE TABLE TABLE_NAME
AS SELECT ...
パイプラインにpipelines.trigger.intervalを設定するには、パイプライン設定のconfigurationオブジェクトに追加します。
{
"configuration": {
"pipelines.trigger.interval": "10 seconds"
}
}
| pipelines.trigger.interval |
|---|
デフォルト値はフロータイプによって変わります:
|
クラスター設定
Create cluster APIと同じJSONフォーマットを用いて、パイプラインで使用されるクラスターを設定することができます。二種類のクラスタータイプに対する設定を行うことができます。全ての処理が行われるdefaultクラスターと、日次のメンテナンスタスクを実行するmaintenanceクラスターです。それぞれのクラスターはlabelフィールドで識別することができます。
クラスターのプロパティの指定はオプションであり、指定されない値に関してはシステムはデフォルト値を使用します。
注意
-
クラスター設定でSparkのバージョンを指定することはできません。Delta Live Tablesのクラスターは、最新機能を取り込むように定期的にアップデートされるDatabricksランタイムのカスタムバージョンで動作します。マニュアルでバージョンを設定するとパイプラインが失敗する場合があります。
-
Delta Live Tablesのクラスターは使用されていない時に自動でシャットダウンされるため、クラスター設定の
autotermination_minutesを設定するクラスターポリシーの参照はエラーとなります。クラスターのシャットダウンの挙動を制御するために、developmentモードかproductionモードを使用するか、Spark設定pipelines.clusterShutdown.delayを使用することができます。以下の例では、pipelines.clusterShutdown.delayを5秒に設定しています。JSON{ "configuration": { "pipelines.clusterShutdown.delay": "60s" }productionモードが有効化されている場合、pipelines.clusterShutdown.delayのデフォルト値は0 secondsです。developmentモードが有効化されている場合、デフォルト値は2 hoursになります。 -
クラスター設定で
num_workersを0に設定すると、クラスターはシングルノードクラスターとして作成されます。オートスケーリングクラスターを設定し、min_workersを0、max_workersを0に設定してもシングルノードクラスターが作成されます。オートスケーリングクラスターを設定し、
min_workersのみを0に設定すると、クラスターはシングルノードクラスターとしては作成されません。このクラスターは停止されるまで常に1台のワーカーが存在することになります。Delta Live Tablesでシングルノードクラスターを作成するクラスター設定のサンプルです。
JSON{ "clusters": [ { "label": "default", "num_workers": 0 } ] }
注意
お使いのストレージにアクセスするためにインスタンスプロファイルや他の設定が必要な場合には、defaultクラスター、maintenanceクラスターの両方に設定してください。
defaultクラスター、maintenanceクラスターの設定例を以下に示します。
{
"clusters": [
{
"label": "default",
"node_type_id": "c5.4xlarge",
"driver_node_type_id": "c5.4xlarge",
"num_workers": 20,
"spark_conf": {
"spark.databricks.io.parquet.nativeReader.enabled": "false"
},
"aws_attributes": {
"instance_profile_arn": "arn:aws:..."
}
},
{
"label": "maintenance",
"aws_attributes": {
"instance_profile_arn": "arn:aws:..."
}
}
]
}
クラウドストレージの設定
AWSのS3ストレージへのアクセスを設定するには、AWSインスタンスプロファイルを使用します。Delta Live Tables APIあるいはDelta Live Tables UIでパイプラインを作成、編集する際にインスタンスプロファイルの設定を追加することができます。また、パイプラインクラスターのJSON設定を直接編集することでAWSインスタンスプロファイルを設定することができます。
- パイプラインのPipeline detailsページで、Settingsボタンをクリックします。Pipeline settingsページが表示されます。
- JSONボタンをクリックします。
- クラスター設定の
aws_attributes.instance_profile_arnフィールドに、インスタンスプロファイル設定を入力します。
{
"clusters": [
{
"label": "default",
"aws_attributes": {
"instance_profile_arn": "arn:aws:..."
}
},
{
"label": "maintenance",
"aws_attributes": {
"instance_profile_arn": "arn:aws:..."
}
}
]
}
defaultとmaintenanceクラスターに対してインスタンスプロファイルを指定する必要があります。
Delta Live Tablesパイプラインのクラスターポリシーを作成する際に、インスタンスプロファイルを設定することもできます。サンプルに関しては、knowledge baseをご覧ください。
クラスターポリシー
注意
Delta Live Tablesクラスターを設定するためにクラスターポリシーを使う際には、defaultとmaintenanceクラスターの両方で単一のポリシーを適用することをお勧めします。
パイプラインクラスターでクラスターポリシーを設定するには、cluster_typeフィールドをdltに設定したポリシーを作成します。
{
"cluster_type": {
"type": "fixed",
"value": "dlt"
}
}
パイプライン設定で、clusterのpolicy_idフィールドをポリシーIDの値に設定します。以下の例では、IDC65B864F02000008のクラスターポリシーを用いてdefaultとmaintenanceクラスターを設定します。
{
"clusters": [
{
"label": "default",
"policy_id": "C65B864F02000008",
"autoscale": {
"min_workers": 1,
"max_workers": 5,
"mode": "ENHANCED"
}
},
{
"label": "maintenance",
"policy_id": "C65B864F02000008"
}
]
}
クラスターポリシーの作成、使用のサンプルについては、Define limits on pipeline clustersをご覧ください。
サンプル
パイプラインとクラスターの設定
以下の例では、ストレージとしてのDBFS、小規模の1ノードのクラスターを使用して、example-notebook_1で実装されたパイプラインを実行するトリガーモードのパイプラインを設定しています。
{
"name": "Example pipeline 1",
"storage": "dbfs:/pipeline-examples/storage-location/example1",
"clusters": [
{
"num_workers": 1,
"spark_conf": {}
}
],
"libraries": [
{
"notebook": {
"path": "/Users/user@databricks.com/example_notebook_1"
}
}
],
"continuous": false
}
パイプラインの複数のノートブックの設定
複数のノートブックを用いたパイプラインを設定するにはlibrariesフィールドを使用します。Delta Live Tablesは、パイプラインに対する処理グラフを構築するためにデータセットの依存関係を自動で解析するので、ノートブックの順序は問いません。以下の例では、example-notebook_1とexample-notebook_2で定義されるデータセットを含むパイプラインを作成します。
{
"name": "Example pipeline 3",
"storage": "dbfs:/pipeline-examples/storage-location/example3",
"libraries": [
{ "notebook": { "path": "/example-notebook_1" } },
{ "notebook": { "path": "/example-notebook_2" } }
]
}
ワークスペースファイルからコードをインポート
パイプラインコードを含むノートブックを設定するためにlibrariesフィールドを用いることに加え、Databricks repoに格納されているソースコードに含まれるパイプラインコードを設定することができます。パイプラインコードを含むソースファイルへのパスを指定するには、fileフィールドを使用します。
{
"libraries": [
{ "file": { "path": "/Repos/<user_name>@databricks.com/Apply_Changes_Into/apply_changes_into.sql" } },
{ "file": { "path": "/Repos/<user_name>@databricks.com/Apply_Changes_Into/apply_changes_into.py" } }
]
}
Delta Live Tablesによる開発ワークフローの作成
開発、ステージング、プロダクションに対応する別々のDelta Live Tablesパイプラインを作成することができるので、作成するデータのコンシューマーに影響を与えることなしに、変換ロジックをテスト、デバッグすることができます。異なるデータベースをターゲットとしつつも同じコードを使用する別々のパイプラインを作成するだけです。
完全に分離された開発環境と、開発環境からプロダクション環境にプッシュするシンプルなワークフローを構築するためにDatabricksのReposの機能と組み合わせることができます。
{
"name": "Data Ingest - DEV user@databricks",
"target": "customers_dev_user",
"libraries": ["/Repos/user@databricks.com/ingestion/etl.py"],
}
{
"name": "Data Ingest - PROD",
"target": "customers",
"libraries": ["/Repos/production/ingestion/etl.py"],
}
パイプラインのパラメータ化
データセットを定義するPython、SQLコードはパイプラインの設定でパラメータ化することができます。パラメータ化を用いることで以下のユースケースを実現することができます。
- コードと長いパスや変数の分離
- 開発、ステージング環境で処理するデータ量を削減し、テストを高速化
- 複数のデータソースに対する処理で同じ変換ロジックを再利用
以下の例では、開発用パイプラインで使用するデータを入力データのサブセットに限定するために、設定値startDateを使用しています。
CREATE LIVE TABLE customer_events
AS SELECT * FROM sourceTable WHERE date > ${mypipeline.startDate};
@dlt.table
def customer_events():
start_date = spark.conf.get("mypipeline.startDate")
return read("sourceTable").where(col("date") > start_date)
{
"name": "Data Ingest - DEV",
"configuration": {
"mypipeline.startDate": "2021-01-02"
}
}
{
"name": "Data Ingest - PROD",
"configuration": {
"mypipeline.startDate": "2010-01-02"
}
}