1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Apache Airflowのジョブの失敗をTeamsに通知する

Last updated at Posted at 2020-12-20

はじめに

Apache Airflowにはタスクが失敗した場合に呼び出すことができる on_failure_callback という仕組みがある。これを使ってMicrosoft Teamsの特定のチャネルにアラートをPOSTする仕組みを作成する。

Teamsへの通知の仕組み

TeamsにメッセージをPOSTする処理は Logic App で作成し、それを呼び出すようにAirflow側で実装するのが便利そうだ。今回はTeamsにPOSTするだけの処理にしたが、これ以外の通知方法(メールなど)や、自動化処理も必要に応じて簡単に追加できるので、この部分をLogic Appに任せるというのは良いアイデアのように思う。
image.png

設定

Logic Appの設定

まずLogic Appを設定する。HTTPリクエストを受信した際に起動するようにトリガーをセットし、TeamsにメッセージをPostするというシンプルなワークフローを定義する。
image.png

HTTPリクエスト受信の設定

HTTPリクエストの受信の際に、Request Bodyとして受信するJSONスキーマを定義する。ここではAirflowから渡すことができるContextの一部の値を受け取ることを想定して3つのフィールドを定義している。
image.png
定義のJSONのTextは以下の通り。

{
    "properties": {
        "ExecDate": {
            "type": "string"
        },
        "RunID": {
            "type": "string"
        },
        "TaskInstance": {
            "type": "string"
        }
    },
    "type": "object"
}

Teamsへの通知の設定

次にTeamsの通知の設定をする。便利なことにLogic AppにはすでにTeamsを操作するための部品が用意されている。その中から今回は Post a message (V3) (Preview) を選択。
Teamを選択し、Post先のChannelを選択し、Message欄に通知したい内容を記載する。HTTP request bodyで受け取る項目を使用して下図のように組み立てた。
image.png

通知テスト用のDAGを作成

default_argson_failure_callback を指定すると、どのタスクでも失敗した場合に通知できるようにできる。post_teams_channel という関数を作成して、on_failure_callback が発火したときに呼べるようにした。ここでは成功するタスク success と失敗するタスク failure を両方動かして挙動を調べる。

import airflow
import requests
from airflow import DAG
from airflow.operators.bash_operator import BashOperator

# Logic AppにHTTP POSTリクエストを送信する。ContextからDAGの情報をrequest bodyに渡す。
def post_teams_channel(context):
    url = "https://<Logic App URL>"
    headers = {'content-type': 'application/json'}
    payload={
       'TaskInstance': str(context['task_instance_key_str']),
       'RunID': str(context['run_id']),
       'ExecDate': str(context['execution_date'])
    }
    r = requests.post(url, headers=headers, json=payload)

# on_failure_callbackをここに実装
args = {
    "owner": "airflow",
    "email": ["airflow@example.com"],
    "depends_on_past": False,
    "on_failure_callback": post_teams_channel,
    "start_date": airflow.utils.dates.days_ago(0)
}

# DAGの作成
dag = DAG(dag_id="teams-notify",
        default_args=args,
        schedule_interval="@daily")

# 成功するタスク "success"
t1 = BashOperator(
    task_id='success',
    bash_command='exit 0',
    dag=dag,
)

# 失敗するタスク "failure"
t2 = BashOperator(
    task_id='failure',
    bash_command='exit 1',
    dag=dag,
)

# タスクの依存関係定義
t1 >> t2

通知テスト

準備が完了したので実際にDAGをトリガーして動作をテストしてみる。以下のコマンドを実行して直ちにトリガーする。

airflow dags trigger teams-notify

しばらくすると task_id : success が成功し、task_id : failure が失敗している結果が確認できる。
image.png
TeamsにはLogic Appで定義した内容でメッセージがPostされていることが確認できる。
image.png

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?