LoginSignup
3
1

More than 1 year has passed since last update.

Airflowによるデータパイプラインの依存関係の管理

Last updated at Posted at 2021-07-02

Managing dependencies in data pipelines | Databricks on AWS [2021/3/17時点]の翻訳です。

データパイプラインに複雑な依存関係があることは頻繁に起こりえます。ワークフローシステムを利用することで、そのような依存関係を記述したり、パイプラインの実行をスケジュールすることができます。

Apache Airflow

Apache Airflowはデータパイプラインを管理、スケジュールするためのソリューションです。Airflowはデータパイプラインを、オペレーションの有向非巡回グラフ(DAG)で表現します。ここでは、オペレーション間の論理的な依存関係をエッジで表現します。

AirflowはDatabricks、Airflow間の密なインテグレーションを提供します。Airflow-Databricksインテグレーションによって、Databricksによって提供される最適化されたSparkエンジンとAirflowのスケジューリング機能を活用することができます。

Airflow-Databricksインテグレーションのインストール

Airflow-DatabricksインテグレーションはAirflowバージョン1.9.0で利用することができます。Airflow-Databricksインテグレーションをインストールするには、以下を実行します。

Bash
pip install "apache-airflow[databricks]"

extras(celerys3passwordなど)をインストールするには、以下を実行します。

Bash
pip install "apache-airflow[databricks, celery, s3, password]"

DatabricksRunNowOperatorオペレーター

Airflow-Databricksインテグレーションは、処理のDAGにおけるノードとしてDatabricksRunNowOperatorを提供します。このオペレーターは、DatabricksジョブのRun now APIエンドポイントに適合し、プログラムからノートブックやDBFSやS3にアップロードされたJARを実行することが可能になります。

DatabricksSubmitRunOperatorオペレーター

Airflow-Databricksインテグレーションは、処理のDAGにおけるノードとしてDatabricksSubmitRunOperatorを提供します。このオペレーターは、DatabricksジョブのRuns submit APIエンドポイントに適合し、プログラムからノートブックやDBFSやS3にアップロードされたJARを実行することが可能になります。

Databricks接続の設定

DatabricksSubmitRunOperatorを使用するには、適切なAirflow接続設定に認証情報を提供する必要があります。DatabricksSubmitRunOperatordatabricks_conn_idを指定しない場合、デフォルトでは、オペレーターはdatabricks_defaultという名前の接続設定から認証情報を探そうとします。

Managing Connectionsに記載されているように、Airflowのweb UIからAirflowの接続設定を行うこともできます。Databricksとの接続に関しては、Loginフィールドをtokenにし、Extraフィールドに以下を設定します。

JSON
{"token": "<personal access token>", "host":"<Databricks hostname>"}

ここで、<personal access token>は、Databricksで作成したpersonal access tokenであり、<Databricks hostname>はDatabricksがデプロイされているホスト名となります。

サンプル

このサンプルでは、ローカルのマシンで実行されるシンプルなAirflowデプロイメントをどのように設定し、Databricksで実行されるように指定されたサンプルDAGをどのようにデプロイするのかを説明します。

Airflowデータベースの初期化

Airflowが様々なメタデータを追跡するのに用いるSQLiteデータベースを初期化します。プロダクションのAirflowのデプロイメントにおいては、標準的なデータベースを用いてAirflowを設定するかもしれません。初期化を行うには以下を実行します。

Bash
airflow initdb

Airflowデプロイメントに対するデフォルト設定とSQLiteデータベースが~/airflowに初期化されます。

DAG定義

DAG定義はPythonファイルであり、この例ではexample_databricks_operator.pyという名前となります。この例では、一つの線形の依存関係を持つ二つのDatabricksジョブを実行します。最初のDatabricksジョブは/Users/airflow@example.com/PrepareDataに配置されたノートブックを起動し、二つ目のジョブはdbfs:/lib/etl-0.1.jarに配置されているJARを実行します。サンプルのDAG定義は二つのDatabricksSubmitRunOperatorタスクから構成され、最後にset_downstreamメソッドを用いて依存関係を設定します。コードのスケルトンバージョンは以下のようなものになります。

Python
notebook_task = DatabricksSubmitRunOperator(
    task_id='notebook_task',
    dag=dag,
    json=notebook_task_params)

spark_jar_task = DatabricksSubmitRunOperator(
    task_id='spark_jar_task',
    dag=dag,
    json=spark_jar_task_params)

notebook_task.set_downstream(spark_jar_task)

Airflowと必要なクラスのインポート

DAG定義のトップレベルでは、airflowDAGDatabricksSubmitRunOperatorをインポートします。

Python
import airflow

from airflow import DAG
from airflow.contrib.operators.databricks_operator import DatabricksSubmitRunOperator

グローバル引数の設定

次のセクションでは、DAGにおけるそれぞれのタスクに適用されるデフォルトの引数を設定します。

Python
args = {
    'owner': 'airflow',
    'email': ['airflow@example.com'],
    'depends_on_past': False,
    'start_date': airflow.utils.dates.days_ago(0)
}

ここで興味深い引数が二つあります。depends_on_paststart_dateです。depends_on_pasttrueに設定することで、以前のタスクが成功しない限り、このタスクは実行されなくなります。start_date引数は、最初のタスクをいつ実行するのかをスケジュールします。

DAGインスタンスの生成

DAGインスタンス作成のステートメントによって、DAGにユニークなIDを与え、デフォルトの引数を適用し、日次のスケジュールを設定します。

Python
dag = DAG(dag_id='example_databricks_operator', default_args=args, schedule_interval='@daily')

次のステートメントでは、Sparkバージョン、ノードのタイプ、タスク実行に必要なクラスターのワーカー数を指定します。指定内容のスキーマはジョブのRuns Submitエンドポイントのnew_clusterフィールドに合致します。

Python
new_cluster = {
    'spark_version': '6.0.x-scala2.11',
    'node_type_id': 'i3.xlarge',
    'aws_attributes': {
        'availability': 'ON_DEMAND'
    },
    'num_workers': 8
}

タスクをDAGに登録

notebook_taskに対しては、DatabricksSubmitRunOperatorインスタンスを作成します。

Python
notebook_task_params = {
    'new_cluster': new_cluster,
    'notebook_task': {
    'notebook_path': '/Users/airflow@example.com/PrepareData',
  },
}
# Example of using the JSON parameter to initialize the operator.
notebook_task = DatabricksSubmitRunOperator(
  task_id='notebook_task',
  dag=dag,
  json=notebook_task_params)

このコードでは、JSONパラメーターとしてRuns Submitエンドポイントに適合するPythonのディクショナリーを与えます。

dbfs:/lib/etl-0.1.jarに配置されているJARファイルを実行するspark_jar_taskに対しては、DatabricksSubmitRunOperatorのインスタンスを生成します。

Python
# Example of using the named parameters of DatabricksSubmitRunOperator to initialize the operator.
spark_jar_task = DatabricksSubmitRunOperator(
  task_id='spark_jar_task',
  dag=dag,
  new_cluster=new_cluster,
  spark_jar_task={
    'main_class_name': 'com.example.ProcessData'
  },
  libraries=[
    {
      'jar': 'dbfs:/lib/etl-0.1.jar'
    }
  ]
)

後段の処理を実行するためにspark_jar_taskを設定するには、依存関係を登録するためにnotebook_taskset_downstreamメソッドを使用します。

Python
notebook_task.set_downstream(spark_jar_task)

ここで使用しているnotebook_taskでは、submit runエンドポイントに対するすべての仕様をjsonパラメーターで指定し、spark_jar_taskではDatabricksSubmitRunOperatorに対するsubmit runエンドポイントのトップレベルのキーをパラメーターとして直接記載したことに注意してください。オペレーターのインスタンスを生成する両方の方法は等価ですが、後者の方法ではspark_python_taskspark_submit_taskのように新たなトップレベルのフィールドを追加することはできません。詳細はDatabricksSubmitRunOperator APIを参照ください。

AirflowへのDAGのインストール、検証

AirflowにDAGをインストールするには、ディレクトリ~/airflow/dagsを作成し、ディレクトリにDAG定義をコピーします。

AirflowがDAGを読み込んだことを検証するには、list_dagsコマンドを実行します。

Bash
airflow list_dags

[2017-07-06 10:27:23,868] {__init__.py:57} INFO - Using executor SequentialExecutor
[2017-07-06 10:27:24,238] {models.py:168} INFO - Filling up the DagBag from /Users/<user>/airflow/dags

-------------------------------------------------------------------
DAGS
-------------------------------------------------------------------
example_bash_operator
example_branch_dop_operator_v3
example_branch_operator
example_databricks_operator
...

訳者注
Airflow 2.1.0ではairflow dags listというコマンドになります。

Airflow UIでのDAGの可視化

Airflowのweb UIでDAGを可視化することができます。airflow webserverを実行し、localhost:8080に接続します。example_databricks_operatorをクリックしDAGを可視化します。以下が例となります。

Airflowへの接続設定

Databricksに対する接続の認証情報はDAG定義では指定されていません。デフォルトでは、DatabricksSubmitRunOperatordatabricks_conn_idパラメーターをdatabricks_defaultに設定します。Databricks接続の設定に記載されているweb UIで、IDがdatabricks_defaultである接続設定を確認します。

それぞれのタスクのテスト

notebook_taskをテストするには、airflow test example_databricks_operator notebook_task <YYYY-MM-DD>を実行し、spark_jar_taskに対しては、airflow test example_databricks_operator spark_jar_task <YYYY-MM-DD>を実行します。DAGのスケジュール処理を実行する場合には、airflow schedulerコマンドでスケジューラーデーモンプロセスを起動します。

スケジューラを開始した後は、web UIでDAGで処理待ちのランを参照することが可能になります。

訳者注
Airflow 2.1.0ではairflow tasks test example_databricks_operator notebook_task 2021-07-02というコマンドになります。

Databricks 無料トライアル

Databricks 無料トライアル

3
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
1