Run a Delta Live Tables pipeline in a workflow | Databricks on AWS [2022/1/12時点]の翻訳です。
本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。
Databricksジョブ、Apache Airflow、Azure Data Factoryのデータ処理ワークフローの一部としてDelta Live Tablesパイプラインを実行することができます。
ジョブ
データ処理ワークフローを実行するために、Databricksジョブのマルチタスクをオーケストレーションすることができます。ジョブにDelta Live Tablesパイプラインを含めるには、ジョブを作成する際にPipelineタスクを使用します。
Apache Airflow
Apache Airflowは、データワークフローを管理、スケジュールするためのオープンソースのソリューションです。Airflowはワークフローをオペレーションの有向非巡回グラフ(DAG)として表現します。Pythonでワークフローを定義して、Airflowは処理の実行とスケジューリングを管理します、DatabricksでAirflowをインストールし、活用するための情報についてはApache Airflowをご覧ください。
Airflowワークフローの一部としてDelta Live Tablesパイプラインを実行するためには、DatabricksSubmitRunOperatorを使用してください。
要件
AirflowのDelta Live Tablesサポートを使用するには、以下の要件を満足する必要があります。
- Airflowバージョン2.1.0以降
- Databricks providerパッケージバージョン2.1.0以降
サンプル
以下のサンプルでは、ID8279d543-063c-4d63-9926-dae38e35ce8b
のDelta Live Tablesパイプラインのアップデートを起動するAirflow DAGを作成しています。
from airflow import DAG
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
from airflow.utils.dates import days_ago
default_args = {
'owner': 'airflow'
}
with DAG('dlt',
start_date=days_ago(2),
schedule_interval="@once",
default_args=default_args
) as dag:
opr_run_now=DatabricksSubmitRunOperator(
task_id='run_now',
databricks_conn_id='CONNECTION_ID',
pipeline_task={"pipeline_id": "8279d543-063c-4d63-9926-dae38e35ce8b"}
)
CONNECTION_ID
をお使いのワークスペースのAirflowコネクションのIDで置き換えてください。
このサンプルをairflow/dags
ディレクトリに保存し、DAGを参照・実行するためにはAirflowのUIを使用します。パイプラインのアップデートの詳細を参照するには、Delta Live TablesのUIを使用します。
Azure Data Factory
Azure Data Factoryは、データのインテグレーション、変換ワークフローのオーケストレーションを実現するクラウドベースのETLサービスです。Azure Data Factoryは、ノートブック、JARタスク、Pythonスクリプトを含むワークフローのDatabricksタスクの実行を直接サポートしています。Azure Data FactoryのWebアクティビティからDelta Live TablesのAPIを呼び出すことで、ワークフローにパイプラインを含めることができます。
-
データファクトリーを作成するか、既存のデータファクトリーを開きます。
-
作成が完了したら、データファクトリーのページを開き、Open Azure Data Factory Studioタイルをクリックします。Azure Data FactoryのUIが表示されます。
-
Databricksリンクサービスを作成します。
-
Azure Data Factory StudioのUIのNewドロップダウンからPipelineを選択し、新規Azure Data Factoryパイプラインを作成します。
-
Activitiesツールボックスで、Generalを展開し、パイプラインキャンバスにWebアクティビティをドラッグします。Settingsタブをクリックし、以下を入力します。
-
URL:
https://<databricks-instance>/api/2.0/pipelines/<pipeline-id>/updates
<databricks-instance>
をDatabricksのワークスペースインスタンス名、例えば、dbc-a1b2345c-d6e7.cloud.databricks.com
で置き換えてください。<pipeline-id>
をパイプラインIDで置き換えてください。- Method: ドロップダウンからPOSTを選択します。
-
Headers: + Newをクリックします。NameテキストボックスにAuthorizationを入力します。Valueに
Bearer <personal-access-token>
を入力します。
<personal-access-token>
をDatabricksパーソナルアクセストークンで置き換えてください。-
Body: 追加のリクエストパラメーターを指定するには、パラメーターを含むJSONドキュメントを入力します。例えば、パイプラインの全てのデータのアップデート、再処理を起動するには:
{"full_refresh": "true"}
を指定します。追加のリクエストパラメーターがない場合は、空の中括弧({}
)を指定します。
-
URL:
Webアクティビティをテストするには、Data Factory UIのパイプラインツールバーのDebugをクリックします。Azure Data FactoryパイプラインのOutputタブに、エラーを含む出力、実行ステータスが表示されます。パイプラインのアップデートの詳細を参照するにはDelta Live TablesのUIを使用します。
ティップス
一般的なワークフロー要件には、以前のタスクの完了後にタスクを起動するというものがあります。Delta Live Tablesのupdates
リクエストは非同期であり、リクエストはあっ苦デートが完了した後ではなくアップデートが起動した後に値を返します。Delta Live Tablesのアップデートに依存するお使いのAzure Data Factoryパイプラインは、アップデートが完了するまで待たなくてはなりません。アップデートの完了まで待つためのオプションは、Delta Live Tablesアップデートを起動するWebアクティビティの後にUntilアクティビティを追加するというものです。
- アップデートの完了後の待ち時間を、Waitアクティビティに秒数で指定します。
- Waitアクティビティの後に、アップデートの状態を取得するためにDelta Live TablesのGet update detailsリクエストを使用するWebアクティビティを追加します。レスポンスの
state
フィールドには、アップデートの完了を含む、アップデートの状態が含まれます。 - Untilアクティビティの終了条件を指定するために
state
フィールドの値を使用します。state
の値に基づいてパイプラインの変数を追加し、この変数を終了条件に使用するためにSet Variableアクティビティを使用することもできます。