LoginSignup
2
0

More than 3 years have passed since last update.

Airflowのタスク/DAGが失敗した時にSlackで通知する仕組み

Last updated at Posted at 2020-02-25

背景

スクリーンショット 2020-02-25 18.06.00.png

↑こんな感じでタスクが失敗したらSlackに通知する仕組みを実装したかった

実装

  • Slackへの通知は Webhookを利用したAPIを利用する -> Sending messages using Incoming Webhooks
  • 各TaskへのSlack通知処理の埋め込みはOperatorのコンストラクタ引数の on_failure_callback を利用する
    • DAGの粒度で監視したい場合はDAGのコンストラクタ引数の on_failure_callback に同様に渡せばok
  • slack_noti はpackage化して各DAGで呼び出せるようにすると便利

import airflow
import requests
import json
from datetime import timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator

WEBHOOK_URL = "https://hooks.slack.com/services/xxxxx"

def slack_noti(context=""):
    url= WEBHOOK_URL
    requests.post(url, data = json.dumps({
        'username': 'Airflow',
        'channel': 'some_channel',
        'icon_url': 'https://xxx.jpeg',
        'attachments': [{
            'title': 'Failed: {}.{}.{}'.format(context['dag'], context['task'], context['execution_date']),
            "color": 'danger'
            }]
        }))

default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    "start_date": airflow.utils.dates.days_ago(1),
    "retries": 1,
    "on_failure_callback": slack_noti,
    "retry_delay": timedelta(minutes=1)
}

dag = DAG("slack_dag", default_args=default_args, catchup=False, schedule_interval="@daily")

def success_py():
    # 1/0
    return 'success'

def fail_py():
    1/0
    return 'fail'

sp = PythonOperator(
    task_id='success_py',
    python_callable=success_py,
    dag=dag,
)

fp = PythonOperator(
    task_id='fail_py',
    python_callable=fail_py,
    dag=dag,
)

sp >> fp
2
0
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0