Managing dependencies in data pipelines | Databricks on AWS [2021/3/17時点]の翻訳です。
データパイプラインに複雑な依存関係があることは頻繁に起こりえます。ワークフローシステムを利用することで、そのような依存関係を記述したり、パイプラインの実行をスケジュールすることができます。
Apache Airflow
Apache Airflowはデータパイプラインを管理、スケジュールするためのソリューションです。Airflowはデータパイプラインを、オペレーションの有向非巡回グラフ(DAG)で表現します。ここでは、オペレーション間の論理的な依存関係をエッジで表現します。
AirflowはDatabricks、Airflow間の密なインテグレーションを提供します。Airflow-Databricksインテグレーションによって、Databricksによって提供される最適化されたSparkエンジンとAirflowのスケジューリング機能を活用することができます。
Airflow-Databricksインテグレーションのインストール
Airflow-DatabricksインテグレーションはAirflowバージョン1.9.0で利用することができます。Airflow-Databricksインテグレーションをインストールするには、以下を実行します。
pip install "apache-airflow[databricks]"
extras(celery
、s3
、password
など)をインストールするには、以下を実行します。
pip install "apache-airflow[databricks, celery, s3, password]"
DatabricksRunNowOperator
オペレーター
Airflow-Databricksインテグレーションは、処理のDAGにおけるノードとしてDatabricksRunNowOperatorを提供します。このオペレーターは、DatabricksジョブのRun now APIエンドポイントに適合し、プログラムからノートブックやDBFSやS3にアップロードされたJARを実行することが可能になります。
DatabricksSubmitRunOperator
オペレーター
Airflow-Databricksインテグレーションは、処理のDAGにおけるノードとしてDatabricksSubmitRunOperatorを提供します。このオペレーターは、DatabricksジョブのRuns submit APIエンドポイントに適合し、プログラムからノートブックやDBFSやS3にアップロードされたJARを実行することが可能になります。
Databricks接続の設定
DatabricksSubmitRunOperator
を使用するには、適切なAirflow接続設定に認証情報を提供する必要があります。DatabricksSubmitRunOperator
でdatabricks_conn_id
を指定しない場合、デフォルトでは、オペレーターはdatabricks_default
という名前の接続設定から認証情報を探そうとします。
Managing Connectionsに記載されているように、Airflowのweb UIからAirflowの接続設定を行うこともできます。Databricksとの接続に関しては、Loginフィールドをtoken
にし、Extraフィールドに以下を設定します。
{"token": "<personal access token>", "host":"<Databricks hostname>"}
ここで、<personal access token>
は、Databricksで作成したpersonal access tokenであり、<Databricks hostname>
はDatabricksがデプロイされているホスト名となります。
サンプル
このサンプルでは、ローカルのマシンで実行されるシンプルなAirflowデプロイメントをどのように設定し、Databricksで実行されるように指定されたサンプルDAGをどのようにデプロイするのかを説明します。
Airflowデータベースの初期化
Airflowが様々なメタデータを追跡するのに用いるSQLiteデータベースを初期化します。プロダクションのAirflowのデプロイメントにおいては、標準的なデータベースを用いてAirflowを設定するかもしれません。初期化を行うには以下を実行します。
airflow initdb
Airflowデプロイメントに対するデフォルト設定とSQLiteデータベースが~/airflow
に初期化されます。
DAG定義
DAG定義はPythonファイルであり、この例ではexample_databricks_operator.py
という名前となります。この例では、一つの線形の依存関係を持つ二つのDatabricksジョブを実行します。最初のDatabricksジョブは/Users/airflow@example.com/PrepareData
に配置されたノートブックを起動し、二つ目のジョブはdbfs:/lib/etl-0.1.jar
に配置されているJARを実行します。サンプルのDAG定義は二つのDatabricksSubmitRunOperator
タスクから構成され、最後にset_downstream
メソッドを用いて依存関係を設定します。コードのスケルトンバージョンは以下のようなものになります。
notebook_task = DatabricksSubmitRunOperator(
task_id='notebook_task',
dag=dag,
json=notebook_task_params)
spark_jar_task = DatabricksSubmitRunOperator(
task_id='spark_jar_task',
dag=dag,
json=spark_jar_task_params)
notebook_task.set_downstream(spark_jar_task)
Airflowと必要なクラスのインポート
DAG定義のトップレベルでは、airflow
、DAG
、DatabricksSubmitRunOperator
をインポートします。
import airflow
from airflow import DAG
from airflow.contrib.operators.databricks_operator import DatabricksSubmitRunOperator
グローバル引数の設定
次のセクションでは、DAGにおけるそれぞれのタスクに適用されるデフォルトの引数を設定します。
args = {
'owner': 'airflow',
'email': ['airflow@example.com'],
'depends_on_past': False,
'start_date': airflow.utils.dates.days_ago(0)
}
ここで興味深い引数が二つあります。depends_on_past
とstart_date
です。depends_on_past
をtrue
に設定することで、以前のタスクが成功しない限り、このタスクは実行されなくなります。start_date
引数は、最初のタスクをいつ実行するのかをスケジュールします。
DAGインスタンスの生成
DAGインスタンス作成のステートメントによって、DAGにユニークなIDを与え、デフォルトの引数を適用し、日次のスケジュールを設定します。
dag = DAG(dag_id='example_databricks_operator', default_args=args, schedule_interval='@daily')
次のステートメントでは、Sparkバージョン、ノードのタイプ、タスク実行に必要なクラスターのワーカー数を指定します。指定内容のスキーマはジョブのRuns Submitエンドポイントのnew_cluster
フィールドに合致します。
new_cluster = {
'spark_version': '6.0.x-scala2.11',
'node_type_id': 'i3.xlarge',
'aws_attributes': {
'availability': 'ON_DEMAND'
},
'num_workers': 8
}
タスクをDAGに登録
notebook_task
に対しては、DatabricksSubmitRunOperator
インスタンスを作成します。
notebook_task_params = {
'new_cluster': new_cluster,
'notebook_task': {
'notebook_path': '/Users/airflow@example.com/PrepareData',
},
}
# Example of using the JSON parameter to initialize the operator.
notebook_task = DatabricksSubmitRunOperator(
task_id='notebook_task',
dag=dag,
json=notebook_task_params)
このコードでは、JSONパラメーターとしてRuns Submit
エンドポイントに適合するPythonのディクショナリーを与えます。
dbfs:/lib/etl-0.1.jar
に配置されているJARファイルを実行するspark_jar_task
に対しては、DatabricksSubmitRunOperator
のインスタンスを生成します。
# Example of using the named parameters of DatabricksSubmitRunOperator to initialize the operator.
spark_jar_task = DatabricksSubmitRunOperator(
task_id='spark_jar_task',
dag=dag,
new_cluster=new_cluster,
spark_jar_task={
'main_class_name': 'com.example.ProcessData'
},
libraries=[
{
'jar': 'dbfs:/lib/etl-0.1.jar'
}
]
)
後段の処理を実行するためにspark_jar_task
を設定するには、依存関係を登録するためにnotebook_task
でset_downstream
メソッドを使用します。
notebook_task.set_downstream(spark_jar_task)
ここで使用しているnotebook_task
では、submit runエンドポイントに対するすべての仕様をjson
パラメーターで指定し、spark_jar_task
ではDatabricksSubmitRunOperator
に対するsubmit runエンドポイントのトップレベルのキーをパラメーターとして直接記載したことに注意してください。オペレーターのインスタンスを生成する両方の方法は等価ですが、後者の方法ではspark_python_task
、spark_submit_task
のように新たなトップレベルのフィールドを追加することはできません。詳細はDatabricksSubmitRunOperator APIを参照ください。
AirflowへのDAGのインストール、検証
AirflowにDAGをインストールするには、ディレクトリ~/airflow/dags
を作成し、ディレクトリにDAG定義をコピーします。
AirflowがDAGを読み込んだことを検証するには、list_dags
コマンドを実行します。
airflow list_dags
[2017-07-06 10:27:23,868] {__init__.py:57} INFO - Using executor SequentialExecutor
[2017-07-06 10:27:24,238] {models.py:168} INFO - Filling up the DagBag from /Users/<user>/airflow/dags
-------------------------------------------------------------------
DAGS
-------------------------------------------------------------------
example_bash_operator
example_branch_dop_operator_v3
example_branch_operator
example_databricks_operator
...
訳者注
Airflow 2.1.0ではairflow dags list
というコマンドになります。
Airflow UIでのDAGの可視化
Airflowのweb UIでDAGを可視化することができます。airflow webserver
を実行し、localhost:8080
に接続します。example_databricks_operator
をクリックしDAGを可視化します。以下が例となります。
Airflowへの接続設定
Databricksに対する接続の認証情報はDAG定義では指定されていません。デフォルトでは、DatabricksSubmitRunOperator
はdatabricks_conn_id
パラメーターをdatabricks_default
に設定します。Databricks接続の設定に記載されているweb UIで、IDがdatabricks_default
である接続設定を確認します。
それぞれのタスクのテスト
notebook_task
をテストするには、airflow test example_databricks_operator notebook_task <YYYY-MM-DD>
を実行し、spark_jar_task
に対しては、airflow test example_databricks_operator spark_jar_task <YYYY-MM-DD>
を実行します。DAGのスケジュール処理を実行する場合には、airflow scheduler
コマンドでスケジューラーデーモンプロセスを起動します。
スケジューラを開始した後は、web UIでDAGで処理待ちのランを参照することが可能になります。
訳者注
Airflow 2.1.0ではairflow tasks test example_databricks_operator notebook_task 2021-07-02
というコマンドになります。