LoginSignup
0
0

More than 3 years have passed since last update.

Managed Workflows for Apache Airflow (MWAA)でメール通知

Last updated at Posted at 2020-12-13

メール通知

ベースのワークフローはこちら
https://qiita.com/pioho07/items/97ebd1351916177d50f3

ここにエラー時のメール通知と、成功時のメール通知処理を入れていきます。

今回はSMTPサーバーにSESを使う

SESでSMTPセッティング

以下の画面の"STMP Settings"をクリックし、[Create My SMTP Credentials]をクリック。
クレデンシャル(中身はユーザー/パスワード)をメモっておく
Server Nameもメモっておく(email-smtp.ap-northeast-1.amazonaws.com)

スクリーンショット 0002-12-13 15.34.30.png

SESでメアドのVerifyしておく

SESはVerifyしたメアドだけ送信許可してるので、今回使うメアドをVerifyします。

"Email Addresses"をクリックし、[Cerify a New Email Address]をクリックする。任意のメアドを入れて、送られて来たメールの本文のリンクをクリックしてVerify完了。

スクリーンショット 0002-12-13 15.36.56.png

MWAA設定値

  • core.default_ui_timezone: Asia/Tokyo
  • email.email_backend: airflow.utils.email.send_email_smtp
  • scheduler.catchup_by_default: False
  • smtp.smtp_host: email-smtp.ap-northeast-1.amazonaws.com (※メモっておいたSESのSMTPエンドポイント)
  • smtp.smtp_mail_from: xxx@example.com
  • smtp.smtp_password: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx (※メモっておいてSMTPのパスワード)
  • smtp.smtp_port: 587
  • smtp.smtp_ssl: False
  • smtp.smtp_starttls: True
  • smtp.smtp_user: AAAAAAAAAAAAAAAAAAAA (※メモっておいてSMTPのユーザー)

こんな感じでairflow.cfgのセクション(coreやsmtpなど)を頭につけてドットを付けて設定オプションを設定する。ちょっと公式ドキュメントと違う。。smtp.smtp_userとかはドキュメントには載ってないが入ってるっぽいです。

スクリーンショット 0002-12-13 15.29.51.png

Airflow DAG

主な変更点は
* email_on_failerで失敗メール送信
* EmailOperatorで成功メール送信
* retries:1でリトライ1回
* 失敗テストはraiseで行う(以下のDAGのraise部分のコメントアウト外す)。

import boto3
import time
import airflow
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.email_operator import EmailOperator

args = {
    "owner": "airflow",
    "start_date": airflow.utils.dates.days_ago(1),
    "provide_context": False,
    "email_on_failure": True,
    "email_on_retry": True,
    "retries":1,
    "retry_delay":120,
    "email": "xxxx@example.com"
}

etl_job_name = "se2_job1"
etl_crwl_name = "se2_out1"

glue_client = boto3.client('glue')

def glue_etl_job():
## fail test
##    raise Exception('エラーテスト')
    job = glue_client.start_job_run(JobName=etl_job_name)
    while True:
        status = glue_client.get_job_run(JobName=etl_job_name, RunId=job['JobRunId'])
        if status['JobRun']['JobRunState'] == 'SUCCEEDED':
            break
        time.sleep(60)

def glue_crawler_job():
    glue_client.start_crawler(Name=etl_crwl_name)
    time.sleep(10)
    while True:
        status = glue_client.get_crawler(Name=etl_crwl_name)
        if status['Crawler']['LastCrawl']['Status'] == 'SUCCEEDED':
            break
        time.sleep(60)

with DAG(
    dag_id="demo_etl_job",
    description="Simple glue DAG",
    default_args=args,
    schedule_interval="*/60 * * * *",
#    catchup=False,
    tags=['demo']
) as dag:
    t1 = PythonOperator(task_id="glue_job_step", python_callable=glue_etl_job)
    t2 = PythonOperator(task_id="glue_crawler_step", python_callable=glue_crawler_job)
    t3 = EmailOperator(
    mime_charset='utf-8',
    task_id='success_email_step',
    to='xxx@example.com',
    subject='Airflow processing report',
    html_content='おめでとう。成功です!'
    )

    t1 >> t2>> t3

フロー図

スクリーンショット 0002-12-13 20.18.21.png

テスト

処理が正常に終わり成功メール通知

スクリーンショット 0002-12-13 20.19.29.png

おめでとう。成功です!

1つ目のステップを失敗させて、エラーメール通知

スクリーンショット 0002-12-13 20.21.02.png

Airflowログのエラー出力

1実行時のログ

スクリーンショット 0002-12-13 21.02.11.png

リトライ実行時のログ
リトライまでの間隔も設定どおり120秒くらいなのがざっくりわかる

スクリーンショット 0002-12-13 21.02.17.png

エラー部分

スクリーンショット 0002-12-13 21.04.13.png

受信したエラーメール

1回目

Airflow alert: <TaskInstance: demo_etl_job.glue_job_step 2020-12-13T10:00:00+00:00 [up_for_retry]>

Try 1 out of 2
Exception:
Failed attempt to attach error logs
Log: Link
Host: ip-10-1-3-72.ap-northeast-1.compute.internal
Log file: /usr/local/airflow/logs/demo_etl_job/glue_job_step/2020-12-13T10:00:00+00:00.log
Mark success: Link

2回目

Airflow alert: <TaskInstance: demo_etl_job.glue_job_step 2020-12-13T10:00:00+00:00 [failed]>

Try 2 out of 2
Exception:
Failed attempt to attach error logs
Log: Link
Host: ip-10-1-3-72.ap-northeast-1.compute.internal
Log file: /usr/local/airflow/logs/demo_etl_job/glue_job_step/2020-12-13T10:00:00+00:00.log
Mark success: Link

リトライ1だったので2回メール来た。リトライ0にしたらメールも1回のみでした。

参考

AWSのMWAAの公式(Airflow Configuration)
https://docs.aws.amazon.com/mwaa/latest/userguide/configuring-env-variables.html

github
https://github.com/apache/airflow

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0