はじめに
Apache Airflowにはタスクが失敗した場合に呼び出すことができる on_failure_callback
という仕組みがある。これを使ってMicrosoft Teamsの特定のチャネルにアラートをPOSTする仕組みを作成する。
Teamsへの通知の仕組み
TeamsにメッセージをPOSTする処理は Logic App で作成し、それを呼び出すようにAirflow側で実装するのが便利そうだ。今回はTeamsにPOSTするだけの処理にしたが、これ以外の通知方法(メールなど)や、自動化処理も必要に応じて簡単に追加できるので、この部分をLogic Appに任せるというのは良いアイデアのように思う。
設定
Logic Appの設定
まずLogic Appを設定する。HTTPリクエストを受信した際に起動するようにトリガーをセットし、TeamsにメッセージをPostするというシンプルなワークフローを定義する。
HTTPリクエスト受信の設定
HTTPリクエストの受信の際に、Request Bodyとして受信するJSONスキーマを定義する。ここではAirflowから渡すことができるContextの一部の値を受け取ることを想定して3つのフィールドを定義している。
定義のJSONのTextは以下の通り。
{
"properties": {
"ExecDate": {
"type": "string"
},
"RunID": {
"type": "string"
},
"TaskInstance": {
"type": "string"
}
},
"type": "object"
}
Teamsへの通知の設定
次にTeamsの通知の設定をする。便利なことにLogic AppにはすでにTeamsを操作するための部品が用意されている。その中から今回は Post a message (V3) (Preview)
を選択。
Teamを選択し、Post先のChannelを選択し、Message欄に通知したい内容を記載する。HTTP request bodyで受け取る項目を使用して下図のように組み立てた。
通知テスト用のDAGを作成
default_args
に on_failure_callback
を指定すると、どのタスクでも失敗した場合に通知できるようにできる。post_teams_channel
という関数を作成して、on_failure_callback
が発火したときに呼べるようにした。ここでは成功するタスク success
と失敗するタスク failure
を両方動かして挙動を調べる。
import airflow
import requests
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
# Logic AppにHTTP POSTリクエストを送信する。ContextからDAGの情報をrequest bodyに渡す。
def post_teams_channel(context):
url = "https://<Logic App URL>"
headers = {'content-type': 'application/json'}
payload={
'TaskInstance': str(context['task_instance_key_str']),
'RunID': str(context['run_id']),
'ExecDate': str(context['execution_date'])
}
r = requests.post(url, headers=headers, json=payload)
# on_failure_callbackをここに実装
args = {
"owner": "airflow",
"email": ["airflow@example.com"],
"depends_on_past": False,
"on_failure_callback": post_teams_channel,
"start_date": airflow.utils.dates.days_ago(0)
}
# DAGの作成
dag = DAG(dag_id="teams-notify",
default_args=args,
schedule_interval="@daily")
# 成功するタスク "success"
t1 = BashOperator(
task_id='success',
bash_command='exit 0',
dag=dag,
)
# 失敗するタスク "failure"
t2 = BashOperator(
task_id='failure',
bash_command='exit 1',
dag=dag,
)
# タスクの依存関係定義
t1 >> t2
通知テスト
準備が完了したので実際にDAGをトリガーして動作をテストしてみる。以下のコマンドを実行して直ちにトリガーする。
airflow dags trigger teams-notify
しばらくすると task_id : success
が成功し、task_id : failure
が失敗している結果が確認できる。
TeamsにはLogic Appで定義した内容でメッセージがPostされていることが確認できる。