本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。
本書では、Databricksの強力なjobs APIとAmazonマネージドApache Airflow(MWAA)をどのように活用するのか、有効非巡回グラフ(DAG)をモニタリングするためにどのようにCloudWatchとDatabricksベースのタスクを連携させるのかを説明します。さらに、DAGのパフォーマンスメトリクスに基づくアラートの作成方法も説明します。
Databricksにおけるオーケストレーションおよびアラート
Databricksにおけるジョブオーケストレーションは完全にインテグレーションされた機能です。お客様はジョブのAPIとUIを用いて、ジョブやモニタリングのためのメールアラートのような機能を活用、管理することができます。このパワフルなAPI駆動のアプローチを用いることで、DatabricksのジョブはAPIさえ提供していれば、あらゆるものとオーケストレートできます(CRMからのデータ取り出しなど)。Databricksのオーケストレーションは、シングルタスク、マルチタスクをサポートしており、新たにDelta Live Tablesのジョブも追加されました。
AmazonマネージドAirflow
Amazon Managed Workflows for Apache Airflow(MWAA)は、Apache Airflowに対するマネージドオーケストレーションサービスです。MWAAはお客様に代わって、AWSのセキュリティ、可用性、スケーラビリティを用いて、オープンソースのApache Airflowプラットフォームを管理します。MWAAは組み込み済みプラグインを通じて、お客様にAWSサービス、様々なサードパーティとの連携容易性のメリットを提供しますので、お客様は複雑なデータ処理パイプラインを構築することができます。
ハイレベルのアーキテクチャ図
Databricksクラスターを起動しノートブックを実行するシンプルなDAGを作成します。MWAAは処理の実行を監視します。
ここではシンプルなジョブ定義をしていますが、MWAAでは様々な複雑なワークロードをオーケストレートすることができます。
環境のセットアップ
本書では、皆様がDatabricksワークスペースへのアクセスができることを前提としています。こちらからフリーのサインアップを行い、Databricksクラスターを設定します。さらに、MWAAへの接続を設定するために用いるAPIトークンを作成します。
MWAA環境の作成方法はこちらの手順を参照してください。
Databricks接続の作成方法
最初のステップは、MWAAでDatabricks接続の設定です。
DAGのサンプル
次に皆様のDAGを、MWAA環境を作成する際に指定したS3バケットのフォルダーにアップロードします。アップロードしたDAGは自動でMWAAのUIに表示されます。
こちらがAirflow DAGのサンプルであり、新規Databricksジョブクラスターの設定、Databricksノートブックタスクを作成し、Databricksで実行するためにノートブックタスクをサブミットします。
from airflow import DAG
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator, DatabricksRunNowOperator
from datetime import datetime, timedelta
#Define params for Submit Run Operator
new_cluster = {
'spark_version': '7.3.x-scala2.12',
'num_workers': 2,
'node_type_id': 'i3.xlarge',
"aws_attributes": {
"instance_profile_arn": "arn:aws:iam::XXXXXXX:instance-profile/databricks-data-role"
}
}
notebook_task = {
'notebook_path': '/Users/xxxxx@XXXXX.com/test',
}
#Define params for Run Now Operator
notebook_params = {
"Variable":5
}
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=2)
}
with DAG('databricks_dag',
start_date=datetime(2021, 1, 1),
schedule_interval='@daily',
catchup=False,
default_args=default_args
) as dag:
opr_submit_run = DatabricksSubmitRunOperator(
task_id='submit_run',
databricks_conn_id='databricks_default',
new_cluster=new_cluster,
notebook_task=notebook_task
)
opr_submit_run
最新バージョンファイルはGitHubから取得できます。
起動するとDatabricksのクラスターUIページでジョブクラスターを確認できます。
トラブルシューティング
Amazon MWAAは、全てのAirflowログのためにAmazon CloudWatchを使用します。これはDAGの失敗のトラブルシューティングに役立ちます。
CloudWatchのメトリクス、アラート
次に、DAGの成功をモニタリングするためのメトリクスを作成します。Amazon MWAAは数多くのメトリクスをサポートしています。
アラームを生成するためにTaskInstanceFailures
を使用します。
閾値(threshold)にはゼロを選択します(1時間の期間で何かしらの障害が発生した場合には、アラートを送信するようにしています)。
DAGが失敗した際に生成されるCloudWatchのEmail通知の例を以下に示します。
You are receiving this email because your Amazon CloudWatch Alarm “DatabricksDAGFailure” in the US East (N. Virginia) region has entered the ALARM state, because “Threshold Crossed
まとめ
この記事では、新規Databricksジョブクラスターを作成、設定し、Databricksで処理を実行するためにDatabricksノートブックタスクを送信する方法を説明しました。そして、サンプルワークフローを監視し、エラーが起きた際に通知を受け取るために、CloudWatchと密連携したMWAAを活用しました。
次のステップ
- Databricks 無料トライアルを試してみてください。
- Amazon Managed Workflow for Apache Airflow (MWAA)を試してみてください。
コードリポジトリ