Delta Live Tables settings | Databricks on AWS [2023/2/24時点]の翻訳です。
本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。
Delta Live Tablesの設定では、パイプラインを実装する一つ以上のノートブック、開発、ステージング、プロダクション環境でパイプラインをどのように実行するのかを設定するパラメーターを指定します。Delta Live Tablesの設定はJSONで表現され、Delta Live TablesのUIで編集することができます。
設定
フィールド |
---|
id Type: string グローバルでユニークなパイプラインのID。IDはシステムによって割り当てられ変更することはできません。 |
name Type: string わかりやすいパイプラインの名称。UIでパイプラインジョブを識別するために使用されます。 |
storage Type: string パイプラインの実行に必要な出力データ、メタデータを格納するDBFSあるいはクラウドストレージ。 storage が設定されない場合、システムがデフォルトのdbfs:/pipelines/ を使用します。パイプラインの作成後に storage を変更することはできません。 |
configuration Type: object パイプラインを実行するクラスターのSpark設定を追加するためのオプションの設定リストです。これらの設定はDelta Live Tablesランタイムに読み込まれ、Spark設定を通じてパイプラインクエリーで使用されます。 各々の要素は key:value の形式である必要があります。configuration の使用例については、パイプラインのパラメータ化を参照ください。 |
libraries Type: array of objects パイプラインのコードと必要とされるアーティファクトを含むノートブックの配列を指定します。サンプルはパイプラインの複数のノートブックの設定を参照ください。 |
clusters Type: array of objects パイプラインを実行するクラスターの設定の配列です。詳細はクラスター設定を参照ください。 |
development Type: boolean パイプラインを development あるいはproduction モードで実行するかを示すフラグです。開発、プロダクションモードをご覧ください。 |
continuous Type: boolean パイプラインを連続で稼働させるかどうかを示すフラグです。 デフォルトは false です。 |
target Type: string パイプラインの出力データを永続化するデータベース名です。 target を設定する事で、DatabricksのUIからパイプラインの出力データを参照、検索することが可能となります。 |
channel Type: string 使用するDelta Live Tablesランタイムのバージョンです。サポートされる値は:
channel フィールドはオプションです。デフォルトはcurrent です。プロダクションのワークロードではcurrent バージョンのランタイムを使用することをお勧めします。 |
edition Type: string パイプラインを実行するDelta Live Tablesの製品エディションです。この設定によって、パイプラインの要件に応じたベストな製品エディションを選択することができます。
edition フィールドはオプションです。デフォルトはADVANCED です。 |
photon Type: boolean パイプラインを実行するためにPhotonランタイムを使用するかどうかを指定するフラグです。PhotonはDatabricksの高性能Sparkエンジンです。Photonが有効化されたパイプラインの課金と、有効化されていないパイプラインの課金は異なります。 photon フィールドはオプションです。デフォルトはfalse です。 |
pipelines.maxFlowRetryAttempts Type: int リトライ可能な障害が発生した際、パイプラインのアップデートが失敗する前にフローのリトライを試みる最大数です。 デフォルト値は2です。デフォルトでは、リトライ可能な障害が発生した場合、Delta Live Tablesランタイムは、初回の試行を含み3回フローの実行を試みます。 |
pipelines.numUpdateRetryAttempts Type: int リトライ可能な障害が発生した際、パイプラインのアップデートが失敗する前にアップデートのリトライを試みる最大数です。リトライはフルアップデートとして実行されます。 デフォルト値は5です。このパラメーターはproductionモードのアップデート実行の際にのみ適用されます。パイプライン実行がdevelopmentモードの場合にはリトライは行われません。 |
パイプラインのトリガー間隔
テーブルをアップデートするフロー、あるいはパイプライン全体のトリガー間隔をコントロールするために、pipelines.trigger.interval
を活用することができます。起動されるパイプラインはそれぞれのテーブルを一度のみ処理するので、pipelines.trigger.interval
は連続パイプラインの場合にのみ使用されます。
ストリーミングクエリーとバッチクエリーとでは異なるデフォルト値があるため、個々のテーブルでpipelines.trigger.interval
を設定することをお勧めします。処理においてパイプラインのグラフ全体のアップデートをコントロールする必要がある場合にのみ、パイプラインにこの値を設定してください。
Pythonのspark_conf
、SQLのSET
を用いてテーブルにpipelines.trigger.interval
を設定します。
@dlt.table(
spark_conf={"pipelines.trigger.interval" : "10 seconds"}
)
def <function-name>():
return (<query>)
SET pipelines.trigger.interval='10 seconds';
CREATE OR REFRESH LIVE TABLE TABLE_NAME
AS SELECT ...
パイプラインにpipelines.trigger.interval
を設定するには、パイプライン設定のconfiguration
オブジェクトに追加します。
{
"configuration": {
"pipelines.trigger.interval": "10 seconds"
}
}
pipelines.trigger.interval |
---|
デフォルト値はフロータイプによって変わります:
|
クラスター設定
Create cluster APIと同じJSONフォーマットを用いて、パイプラインで使用されるクラスターを設定することができます。二種類のクラスタータイプに対する設定を行うことができます。全ての処理が行われるdefaultクラスターと、日次のメンテナンスタスクを実行するmaintenanceクラスターです。それぞれのクラスターはlabel
フィールドで識別することができます。
クラスターのプロパティの指定はオプションであり、指定されない値に関してはシステムはデフォルト値を使用します。
注意
-
クラスター設定でSparkのバージョンを指定することはできません。Delta Live Tablesのクラスターは、最新機能を取り込むように定期的にアップデートされるDatabricksランタイムのカスタムバージョンで動作します。マニュアルでバージョンを設定するとパイプラインが失敗する場合があります。
-
Delta Live Tablesのクラスターは使用されていない時に自動でシャットダウンされるため、クラスター設定の
autotermination_minutes
を設定するクラスターポリシーの参照はエラーとなります。クラスターのシャットダウンの挙動を制御するために、developmentモードかproductionモードを使用するか、Spark設定pipelines.clusterShutdown.delay
を使用することができます。以下の例では、pipelines.clusterShutdown.delay
を5秒に設定しています。JSON{ "configuration": { "pipelines.clusterShutdown.delay": "60s" }
production
モードが有効化されている場合、pipelines.clusterShutdown.delay
のデフォルト値は0 seconds
です。development
モードが有効化されている場合、デフォルト値は2 hours
になります。 -
クラスター設定で
num_workers
を0に設定すると、クラスターはシングルノードクラスターとして作成されます。オートスケーリングクラスターを設定し、min_workers
を0、max_workers
を0に設定してもシングルノードクラスターが作成されます。オートスケーリングクラスターを設定し、
min_workers
のみを0に設定すると、クラスターはシングルノードクラスターとしては作成されません。このクラスターは停止されるまで常に1台のワーカーが存在することになります。Delta Live Tablesでシングルノードクラスターを作成するクラスター設定のサンプルです。
JSON{ "clusters": [ { "label": "default", "num_workers": 0 } ] }
注意
お使いのストレージにアクセスするためにインスタンスプロファイルや他の設定が必要な場合には、defaultクラスター、maintenanceクラスターの両方に設定してください。
defaultクラスター、maintenanceクラスターの設定例を以下に示します。
{
"clusters": [
{
"label": "default",
"node_type_id": "c5.4xlarge",
"driver_node_type_id": "c5.4xlarge",
"num_workers": 20,
"spark_conf": {
"spark.databricks.io.parquet.nativeReader.enabled": "false"
},
"aws_attributes": {
"instance_profile_arn": "arn:aws:..."
}
},
{
"label": "maintenance",
"aws_attributes": {
"instance_profile_arn": "arn:aws:..."
}
}
]
}
クラウドストレージの設定
AWSのS3ストレージへのアクセスを設定するには、AWSインスタンスプロファイルを使用します。Delta Live Tables APIあるいはDelta Live Tables UIでパイプラインを作成、編集する際にインスタンスプロファイルの設定を追加することができます。また、パイプラインクラスターのJSON設定を直接編集することでAWSインスタンスプロファイルを設定することができます。
- パイプラインのPipeline detailsページで、Settingsボタンをクリックします。Pipeline settingsページが表示されます。
- JSONボタンをクリックします。
- クラスター設定の
aws_attributes.instance_profile_arn
フィールドに、インスタンスプロファイル設定を入力します。
{
"clusters": [
{
"label": "default",
"aws_attributes": {
"instance_profile_arn": "arn:aws:..."
}
},
{
"label": "maintenance",
"aws_attributes": {
"instance_profile_arn": "arn:aws:..."
}
}
]
}
defaultとmaintenanceクラスターに対してインスタンスプロファイルを指定する必要があります。
Delta Live Tablesパイプラインのクラスターポリシーを作成する際に、インスタンスプロファイルを設定することもできます。サンプルに関しては、knowledge baseをご覧ください。
クラスターポリシー
注意
Delta Live Tablesクラスターを設定するためにクラスターポリシーを使う際には、default
とmaintenance
クラスターの両方で単一のポリシーを適用することをお勧めします。
パイプラインクラスターでクラスターポリシーを設定するには、cluster_type
フィールドをdlt
に設定したポリシーを作成します。
{
"cluster_type": {
"type": "fixed",
"value": "dlt"
}
}
パイプライン設定で、clusterのpolicy_id
フィールドをポリシーIDの値に設定します。以下の例では、IDC65B864F02000008
のクラスターポリシーを用いてdefault
とmaintenance
クラスターを設定します。
{
"clusters": [
{
"label": "default",
"policy_id": "C65B864F02000008",
"autoscale": {
"min_workers": 1,
"max_workers": 5,
"mode": "ENHANCED"
}
},
{
"label": "maintenance",
"policy_id": "C65B864F02000008"
}
]
}
クラスターポリシーの作成、使用のサンプルについては、Define limits on pipeline clustersをご覧ください。
サンプル
パイプラインとクラスターの設定
以下の例では、ストレージとしてのDBFS、小規模の1ノードのクラスターを使用して、example-notebook_1で実装されたパイプラインを実行するトリガーモードのパイプラインを設定しています。
{
"name": "Example pipeline 1",
"storage": "dbfs:/pipeline-examples/storage-location/example1",
"clusters": [
{
"num_workers": 1,
"spark_conf": {}
}
],
"libraries": [
{
"notebook": {
"path": "/Users/user@databricks.com/example_notebook_1"
}
}
],
"continuous": false
}
パイプラインの複数のノートブックの設定
複数のノートブックを用いたパイプラインを設定するにはlibraries
フィールドを使用します。Delta Live Tablesは、パイプラインに対する処理グラフを構築するためにデータセットの依存関係を自動で解析するので、ノートブックの順序は問いません。以下の例では、example-notebook_1とexample-notebook_2で定義されるデータセットを含むパイプラインを作成します。
{
"name": "Example pipeline 3",
"storage": "dbfs:/pipeline-examples/storage-location/example3",
"libraries": [
{ "notebook": { "path": "/example-notebook_1" } },
{ "notebook": { "path": "/example-notebook_2" } }
]
}
ワークスペースファイルからコードをインポート
パイプラインコードを含むノートブックを設定するためにlibraries
フィールドを用いることに加え、Databricks repoに格納されているソースコードに含まれるパイプラインコードを設定することができます。パイプラインコードを含むソースファイルへのパスを指定するには、file
フィールドを使用します。
{
"libraries": [
{ "file": { "path": "/Repos/<user_name>@databricks.com/Apply_Changes_Into/apply_changes_into.sql" } },
{ "file": { "path": "/Repos/<user_name>@databricks.com/Apply_Changes_Into/apply_changes_into.py" } }
]
}
Delta Live Tablesによる開発ワークフローの作成
開発、ステージング、プロダクションに対応する別々のDelta Live Tablesパイプラインを作成することができるので、作成するデータのコンシューマーに影響を与えることなしに、変換ロジックをテスト、デバッグすることができます。異なるデータベースをターゲットとしつつも同じコードを使用する別々のパイプラインを作成するだけです。
完全に分離された開発環境と、開発環境からプロダクション環境にプッシュするシンプルなワークフローを構築するためにDatabricksのReposの機能と組み合わせることができます。
{
"name": "Data Ingest - DEV user@databricks",
"target": "customers_dev_user",
"libraries": ["/Repos/user@databricks.com/ingestion/etl.py"],
}
{
"name": "Data Ingest - PROD",
"target": "customers",
"libraries": ["/Repos/production/ingestion/etl.py"],
}
パイプラインのパラメータ化
データセットを定義するPython、SQLコードはパイプラインの設定でパラメータ化することができます。パラメータ化を用いることで以下のユースケースを実現することができます。
- コードと長いパスや変数の分離
- 開発、ステージング環境で処理するデータ量を削減し、テストを高速化
- 複数のデータソースに対する処理で同じ変換ロジックを再利用
以下の例では、開発用パイプラインで使用するデータを入力データのサブセットに限定するために、設定値startDate
を使用しています。
CREATE LIVE TABLE customer_events
AS SELECT * FROM sourceTable WHERE date > ${mypipeline.startDate};
@dlt.table
def customer_events():
start_date = spark.conf.get("mypipeline.startDate")
return read("sourceTable").where(col("date") > start_date)
{
"name": "Data Ingest - DEV",
"configuration": {
"mypipeline.startDate": "2021-01-02"
}
}
{
"name": "Data Ingest - PROD",
"configuration": {
"mypipeline.startDate": "2010-01-02"
}
}