0
0

More than 1 year has passed since last update.

Apache AirflowのマネージドワークフローによるAWS Databricksのワークロードのオーケストレーション

Posted at

How to Orchestrate Databricks Workloads on AWS With Managed Workflows for Apache Airflow - The Databricks Blogの翻訳です。

本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。

本書では、Databricksの強力なjobs APIとAmazonマネージドApache Airflow(MWAA)をどのように活用するのか、有効非巡回グラフ(DAG)をモニタリングするためにどのようにCloudWatchとDatabricksベースのタスクを連携させるのかを説明します。さらに、DAGのパフォーマンスメトリクスに基づくアラートの作成方法も説明します。

Databricksにおけるオーケストレーションおよびアラート

Databricksにおけるジョブオーケストレーションは完全にインテグレーションされた機能です。お客様はジョブのAPIとUIを用いて、ジョブやモニタリングのためのメールアラートのような機能を活用、管理することができます。このパワフルなAPI駆動のアプローチを用いることで、DatabricksのジョブはAPIさえ提供していれば、あらゆるものとオーケストレートできます(CRMからのデータ取り出しなど)。Databricksのオーケストレーションは、シングルタスク、マルチタスクをサポートしており、新たにDelta Live Tablesのジョブも追加されました。

AmazonマネージドAirflow

Amazon Managed Workflows for Apache Airflow(MWAA)は、Apache Airflowに対するマネージドオーケストレーションサービスです。MWAAはお客様に代わって、AWSのセキュリティ、可用性、スケーラビリティを用いて、オープンソースのApache Airflowプラットフォームを管理します。MWAAは組み込み済みプラグインを通じて、お客様にAWSサービス、様々なサードパーティとの連携容易性のメリットを提供しますので、お客様は複雑なデータ処理パイプラインを構築することができます。

ハイレベルのアーキテクチャ図

Databricksクラスターを起動しノートブックを実行するシンプルなDAGを作成します。MWAAは処理の実行を監視します。

ここではシンプルなジョブ定義をしていますが、MWAAでは様々な複雑なワークロードをオーケストレートすることができます。

環境のセットアップ

本書では、皆様がDatabricksワークスペースへのアクセスができることを前提としています。こちらからフリーのサインアップを行い、Databricksクラスターを設定します。さらに、MWAAへの接続を設定するために用いるAPIトークンを作成します。

MWAA環境の作成方法はこちらの手順を参照してください。

Databricks接続の作成方法

最初のステップは、MWAAでDatabricks接続の設定です。

DAGのサンプル

次に皆様のDAGを、MWAA環境を作成する際に指定したS3バケットのフォルダーにアップロードします。アップロードしたDAGは自動でMWAAのUIに表示されます。

こちらがAirflow DAGのサンプルであり、新規Databricksジョブクラスターの設定、Databricksノートブックタスクを作成し、Databricksで実行するためにノートブックタスクをサブミットします。

Python
from airflow import DAG
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator, DatabricksRunNowOperator
from datetime import datetime, timedelta 

#Define params for Submit Run Operator
new_cluster = {
    'spark_version': '7.3.x-scala2.12',
    'num_workers': 2,
    'node_type_id': 'i3.xlarge',
     "aws_attributes": {
        "instance_profile_arn": "arn:aws:iam::XXXXXXX:instance-profile/databricks-data-role"
    }
}

notebook_task = {
    'notebook_path': '/Users/xxxxx@XXXXX.com/test',
}

#Define params for Run Now Operator
notebook_params = {
    "Variable":5
}

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=2)
}

with DAG('databricks_dag',
    start_date=datetime(2021, 1, 1),
    schedule_interval='@daily',
    catchup=False,
    default_args=default_args
    ) as dag:

    opr_submit_run = DatabricksSubmitRunOperator(
        task_id='submit_run',
        databricks_conn_id='databricks_default',
        new_cluster=new_cluster,
        notebook_task=notebook_task
    )
    opr_submit_run

最新バージョンファイルはGitHubから取得できます。

MWAAでDAGを起動します。

起動するとDatabricksのクラスターUIページでジョブクラスターを確認できます。

トラブルシューティング

Amazon MWAAは、全てのAirflowログのためにAmazon CloudWatchを使用します。これはDAGの失敗のトラブルシューティングに役立ちます。

CloudWatchのメトリクス、アラート

次に、DAGの成功をモニタリングするためのメトリクスを作成します。Amazon MWAAは数多くのメトリクスをサポートしています。

アラームを生成するためにTaskInstanceFailuresを使用します。

閾値(threshold)にはゼロを選択します(1時間の期間で何かしらの障害が発生した場合には、アラートを送信するようにしています)。

最後に、アクションとしてEmail通知を選択します。

DAGが失敗した際に生成されるCloudWatchのEmail通知の例を以下に示します。

You are receiving this email because your Amazon CloudWatch Alarm “DatabricksDAGFailure” in the US East (N. Virginia) region has entered the ALARM state, because “Threshold Crossed

まとめ

この記事では、新規Databricksジョブクラスターを作成、設定し、Databricksで処理を実行するためにDatabricksノートブックタスクを送信する方法を説明しました。そして、サンプルワークフローを監視し、エラーが起きた際に通知を受け取るために、CloudWatchと密連携したMWAAを活用しました。

次のステップ

コードリポジトリ

Databricks 無料トライアル

Databricks 無料トライアル

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0