メール通知
ベースのワークフローはこちら
https://qiita.com/pioho07/items/97ebd1351916177d50f3
ここにエラー時のメール通知と、成功時のメール通知処理を入れていきます。
今回はSMTPサーバーにSESを使う
SESでSMTPセッティング
以下の画面の"STMP Settings"をクリックし、[Create My SMTP Credentials]をクリック。
クレデンシャル(中身はユーザー/パスワード)をメモっておく
Server Nameもメモっておく(email-smtp.ap-northeast-1.amazonaws.com)
SESでメアドのVerifyしておく
SESはVerifyしたメアドだけ送信許可してるので、今回使うメアドをVerifyします。
"Email Addresses"をクリックし、[Cerify a New Email Address]をクリックする。任意のメアドを入れて、送られて来たメールの本文のリンクをクリックしてVerify完了。
MWAA設定値
- core.default_ui_timezone: Asia/Tokyo
- email.email_backend: airflow.utils.email.send_email_smtp
- scheduler.catchup_by_default: False
- smtp.smtp_host: email-smtp.ap-northeast-1.amazonaws.com (※メモっておいたSESのSMTPエンドポイント)
- smtp.smtp_mail_from: xxx@example.com
- smtp.smtp_password: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx (※メモっておいてSMTPのパスワード)
- smtp.smtp_port: 587
- smtp.smtp_ssl: False
- smtp.smtp_starttls: True
- smtp.smtp_user: AAAAAAAAAAAAAAAAAAAA (※メモっておいてSMTPのユーザー)
こんな感じでairflow.cfgのセクション(coreやsmtpなど)を頭につけてドットを付けて設定オプションを設定する。ちょっと公式ドキュメントと違う。。smtp.smtp_userとかはドキュメントには載ってないが入ってるっぽいです。
Airflow DAG
主な変更点は
- email_on_failerで失敗メール送信
- EmailOperatorで成功メール送信
- retries:1でリトライ1回
- 失敗テストはraiseで行う(以下のDAGのraise部分のコメントアウト外す)。
import boto3
import time
import airflow
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.email_operator import EmailOperator
args = {
"owner": "airflow",
"start_date": airflow.utils.dates.days_ago(1),
"provide_context": False,
"email_on_failure": True,
"email_on_retry": True,
"retries":1,
"retry_delay":120,
"email": "xxxx@example.com"
}
etl_job_name = "se2_job1"
etl_crwl_name = "se2_out1"
glue_client = boto3.client('glue')
def glue_etl_job():
## fail test
## raise Exception('エラーテスト')
job = glue_client.start_job_run(JobName=etl_job_name)
while True:
status = glue_client.get_job_run(JobName=etl_job_name, RunId=job['JobRunId'])
if status['JobRun']['JobRunState'] == 'SUCCEEDED':
break
time.sleep(60)
def glue_crawler_job():
glue_client.start_crawler(Name=etl_crwl_name)
time.sleep(10)
while True:
status = glue_client.get_crawler(Name=etl_crwl_name)
if status['Crawler']['LastCrawl']['Status'] == 'SUCCEEDED':
break
time.sleep(60)
with DAG(
dag_id="demo_etl_job",
description="Simple glue DAG",
default_args=args,
schedule_interval="*/60 * * * *",
# catchup=False,
tags=['demo']
) as dag:
t1 = PythonOperator(task_id="glue_job_step", python_callable=glue_etl_job)
t2 = PythonOperator(task_id="glue_crawler_step", python_callable=glue_crawler_job)
t3 = EmailOperator(
mime_charset='utf-8',
task_id='success_email_step',
to='xxx@example.com',
subject='Airflow processing report',
html_content='おめでとう。成功です!'
)
t1 >> t2>> t3
フロー図
テスト
処理が正常に終わり成功メール通知
おめでとう。成功です!
1つ目のステップを失敗させて、エラーメール通知
Airflowログのエラー出力
1実行時のログ
リトライ実行時のログ
リトライまでの間隔も設定どおり120秒くらいなのがざっくりわかる
エラー部分
受信したエラーメール
1回目
Airflow alert: <TaskInstance: demo_etl_job.glue_job_step 2020-12-13T10:00:00+00:00 [up_for_retry]>
Try 1 out of 2
Exception:
Failed attempt to attach error logs
Log: Link
Host: ip-10-1-3-72.ap-northeast-1.compute.internal
Log file: /usr/local/airflow/logs/demo_etl_job/glue_job_step/2020-12-13T10:00:00+00:00.log
Mark success: Link
2回目
Airflow alert: <TaskInstance: demo_etl_job.glue_job_step 2020-12-13T10:00:00+00:00 [failed]>
Try 2 out of 2
Exception:
Failed attempt to attach error logs
Log: Link
Host: ip-10-1-3-72.ap-northeast-1.compute.internal
Log file: /usr/local/airflow/logs/demo_etl_job/glue_job_step/2020-12-13T10:00:00+00:00.log
Mark success: Link
リトライ1だったので2回メール来た。リトライ0にしたらメールも1回のみでした。
参考
AWSのMWAAの公式(Airflow Configuration)
https://docs.aws.amazon.com/mwaa/latest/userguide/configuring-env-variables.html