背景
↑こんな感じでタスクが失敗したらSlackに通知する仕組みを実装したかった
実装
- Slackへの通知は Webhookを利用したAPIを利用する -> Sending messages using Incoming Webhooks
- 各TaskへのSlack通知処理の埋め込みはOperatorのコンストラクタ引数の on_failure_callback を利用する
- DAGの粒度で監視したい場合はDAGのコンストラクタ引数の on_failure_callback に同様に渡せばok
-
slack_noti
はpackage化して各DAGで呼び出せるようにすると便利
import airflow
import requests
import json
from datetime import timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
WEBHOOK_URL = "https://hooks.slack.com/services/xxxxx"
def slack_noti(context=""):
url= WEBHOOK_URL
requests.post(url, data = json.dumps({
'username': 'Airflow',
'channel': 'some_channel',
'icon_url': 'https://xxx.jpeg',
'attachments': [{
'title': 'Failed: {}.{}.{}'.format(context['dag'], context['task'], context['execution_date']),
"color": 'danger'
}]
}))
default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": airflow.utils.dates.days_ago(1),
"retries": 1,
"on_failure_callback": slack_noti,
"retry_delay": timedelta(minutes=1)
}
dag = DAG("slack_dag", default_args=default_args, catchup=False, schedule_interval="@daily")
def success_py():
# 1/0
return 'success'
def fail_py():
1/0
return 'fail'
sp = PythonOperator(
task_id='success_py',
python_callable=success_py,
dag=dag,
)
fp = PythonOperator(
task_id='fail_py',
python_callable=fail_py,
dag=dag,
)
sp >> fp