1
0

More than 1 year has passed since last update.

ワークフローでDelta Live Tablesパイプラインを実行する

Posted at

Run a Delta Live Tables pipeline in a workflow | Databricks on AWS [2022/1/12時点]の翻訳です。

本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。

Databricksジョブ、Apache Airflow、Azure Data Factoryのデータ処理ワークフローの一部としてDelta Live Tablesパイプラインを実行することができます。

ジョブ

データ処理ワークフローを実行するために、Databricksジョブのマルチタスクをオーケストレーションすることができます。ジョブにDelta Live Tablesパイプラインを含めるには、ジョブを作成する際にPipelineタスクを使用します。

Apache Airflow

Apache Airflowは、データワークフローを管理、スケジュールするためのオープンソースのソリューションです。Airflowはワークフローをオペレーションの有向非巡回グラフ(DAG)として表現します。Pythonでワークフローを定義して、Airflowは処理の実行とスケジューリングを管理します、DatabricksでAirflowをインストールし、活用するための情報についてはApache Airflowをご覧ください。

Airflowワークフローの一部としてDelta Live Tablesパイプラインを実行するためには、DatabricksSubmitRunOperatorを使用してください。

要件

AirflowのDelta Live Tablesサポートを使用するには、以下の要件を満足する必要があります。

  • Airflowバージョン2.1.0以降
  • Databricks providerパッケージバージョン2.1.0以降

サンプル

以下のサンプルでは、ID8279d543-063c-4d63-9926-dae38e35ce8bのDelta Live Tablesパイプラインのアップデートを起動するAirflow DAGを作成しています。

Python
from airflow import DAG
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
from airflow.utils.dates import days_ago

default_args = {
  'owner': 'airflow'
}

with DAG('dlt',
         start_date=days_ago(2),
         schedule_interval="@once",
         default_args=default_args
         ) as dag:

  opr_run_now=DatabricksSubmitRunOperator(
    task_id='run_now',
    databricks_conn_id='CONNECTION_ID',
    pipeline_task={"pipeline_id": "8279d543-063c-4d63-9926-dae38e35ce8b"}
  )

CONNECTION_IDをお使いのワークスペースのAirflowコネクションのIDで置き換えてください。

このサンプルをairflow/dagsディレクトリに保存し、DAGを参照・実行するためにはAirflowのUIを使用します。パイプラインのアップデートの詳細を参照するには、Delta Live TablesのUIを使用します。

Azure Data Factory

Azure Data Factoryは、データのインテグレーション、変換ワークフローのオーケストレーションを実現するクラウドベースのETLサービスです。Azure Data Factoryは、ノートブック、JARタスク、Pythonスクリプトを含むワークフローのDatabricksタスクの実行を直接サポートしています。Azure Data FactoryのWebアクティビティからDelta Live TablesのAPIを呼び出すことで、ワークフローにパイプラインを含めることができます。

  1. データファクトリーを作成するか、既存のデータファクトリーを開きます。

  2. 作成が完了したら、データファクトリーのページを開き、Open Azure Data Factory Studioタイルをクリックします。Azure Data FactoryのUIが表示されます。

  3. Databricksリンクサービスを作成します。

  4. Azure Data Factory StudioのUIのNewドロップダウンからPipelineを選択し、新規Azure Data Factoryパイプラインを作成します。

  5. Activitiesツールボックスで、Generalを展開し、パイプラインキャンバスにWebアクティビティをドラッグします。Settingsタブをクリックし、以下を入力します。

    • URL: https://<databricks-instance>/api/2.0/pipelines/<pipeline-id>/updates

    <databricks-instance>Databricksのワークスペースインスタンス名、例えば、dbc-a1b2345c-d6e7.cloud.databricks.comで置き換えてください。

    <pipeline-id>をパイプラインIDで置き換えてください。

    • Method: ドロップダウンからPOSTを選択します。
    • Headers: + Newをクリックします。NameテキストボックスにAuthorizationを入力します。ValueBearer <personal-access-token>を入力します。

    <personal-access-token>をDatabricksパーソナルアクセストークンで置き換えてください。

    • Body: 追加のリクエストパラメーターを指定するには、パラメーターを含むJSONドキュメントを入力します。例えば、パイプラインの全てのデータのアップデート、再処理を起動するには:{"full_refresh": "true"}を指定します。追加のリクエストパラメーターがない場合は、空の中括弧({})を指定します。

Webアクティビティをテストするには、Data Factory UIのパイプラインツールバーのDebugをクリックします。Azure Data FactoryパイプラインのOutputタブに、エラーを含む出力、実行ステータスが表示されます。パイプラインのアップデートの詳細を参照するにはDelta Live TablesのUIを使用します。

ティップス
一般的なワークフロー要件には、以前のタスクの完了後にタスクを起動するというものがあります。Delta Live Tablesのupdatesリクエストは非同期であり、リクエストはあっ苦デートが完了した後ではなくアップデートが起動した後に値を返します。Delta Live Tablesのアップデートに依存するお使いのAzure Data Factoryパイプラインは、アップデートが完了するまで待たなくてはなりません。アップデートの完了まで待つためのオプションは、Delta Live Tablesアップデートを起動するWebアクティビティの後にUntilアクティビティを追加するというものです。

  1. アップデートの完了後の待ち時間を、Waitアクティビティに秒数で指定します。
  2. Waitアクティビティの後に、アップデートの状態を取得するためにDelta Live TablesのGet update detailsリクエストを使用するWebアクティビティを追加します。レスポンスのstateフィールドには、アップデートの完了を含む、アップデートの状態が含まれます。
  3. Untilアクティビティの終了条件を指定するためにstateフィールドの値を使用します。stateの値に基づいてパイプラインの変数を追加し、この変数を終了条件に使用するためにSet Variableアクティビティを使用することもできます。

Databricks 無料トライアル

Databricks 無料トライアル

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0