はじめに
AWSのManaged AirflowサービスであるMWAAを試してみる
構築
MWAAの構築はクラメソさんの記事で済ませます
https://dev.classmethod.jp/articles/amazon-managed-workflows-for-apache-airflow-mwaa-ga/
流れ
以下の記事で行っているGlueジョブ実行とGlueクローラ実行をAirflowでやってみます。GlueジョブやS3バケットなどは既にあるものとして進めます。作成したい場合は以下のqiitaで再現は出来ると思いますので割愛します。
https://qiita.com/pioho07/items/523aec26ca5dc5bc9697
登場リソース
- 入力ファイル:19行のcsvファイルでS3にアップ済
- Glueジョブ名:se2_job1
- Glueクローラー名:se2_out1
処理内容:
se2_job1 Glueジョブで、csvファイルをParquetに変換します。
se2_out1 Glueクローラで、出力されたParquetファイルからテーブルを作成します。
Airflowでは、se2_job1 -> se2_out1 の順番で実行するシンプルなワークフローです。
今回使うサンプルログファイル(19件)
deviceid,uuid,appid,country,year,month,day,hour
iphone,11111,1,JP,2017,12,14,12
android,11112,1,FR,2017,12,14,14
iphone,11113,9,FR,2017,12,16,21
iphone,11114,007,AUS,2017,12,17,18
other,11115,005,JP,2017,12,29,15
iphone,11116,001,JP,2017,12,15,11
pc,11118,001,FR,2017,12,01,01
pc,11117,009,FR,2017,12,02,18
iphone,11119,007,AUS,2017,11,21,14
other,11110,005,JP,2017,11,29,15
iphone,11121,001,JP,2017,11,11,12
android,11122,001,FR,2017,11,30,20
iphone,11123,009,FR,2017,11,14,14
iphone,11124,007,AUS,2017,12,17,14
iphone,11125,005,JP,2017,11,29,15
iphone,11126,001,JP,2017,12,19,08
android,11127,001,FR,2017,12,19,14
iphone,11128,009,FR,2017,12,09,04
iphone,11129,007,AUS,2017,11,30,14
ワークフロー作成
MWAAの"DAGフォルダ"で指定したS3パスに以下のDAGコードをアップロードします。
※テストなのでかなり緩いエラーハンドリングです。
import boto3
import time
import airflow
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
args = {
"owner": "airflow",
"start_date": airflow.utils.dates.days_ago(1),
"provide_context": False,
}
etl_job_name = "se2_job1"
etl_crwl_name = "se2_out1"
glue_client = boto3.client('glue')
def glue_etl_job():
job = glue_client.start_job_run(JobName=etl_job_name)
while True:
status = glue_client.get_job_run(JobName=etl_job_name, RunId=job['JobRunId'])
if status['JobRun']['JobRunState'] == 'SUCCEEDED':
break
time.sleep(60)
def glue_crawler_job():
glue_client.start_crawler(Name=etl_crwl_name)
while True:
status = glue_client.get_crawler(Name=etl_crwl_name)
if status['Crawler']['LastCrawl']['Status'] == 'SUCCEEDED':
break
time.sleep(60)
with DAG(
dag_id="demo_etl_job",
description="Simple glue DAG",
default_args=args,
schedule_interval="*/60 * * * *",
catchup=False,
tags=['demo']
) as dag:
t1 = PythonOperator(task_id="glue_job_step2", python_callable=glue_etl_job)
t2 = PythonOperator(task_id="glue_crawler_step", python_callable=glue_crawler_job)
t1 >> t2
S3にアップすると、だいたい1分未満くらいでAirflowのUIに反映されます。
ジョブは最初Pause状態なので、画面のon/offボタンをクリックしonにします。これによりジョブがアクティブになりスケジュールが行われます
正常に終わるとこんな感じ
DAG名の"demo_etl_job"をクリックするとこんな画面になります。
Glue側もジョブとクローラが正常に終了してます
いろいろ見てみる
シンプルなワークフローなのでそんなに面白くない
ログの確認
画面のジョブの中のタスク(glue_crawler_job)をクリックします。
[View Log]をクリック
ログが見れます。実行したタスクの見たいログがすぐ見れていいですね。他にも必要な機能が集約されたモノリスもいいもんですね。
AWSらしく、同じ内容のログはCloudWatchLogsにも出ています。他の用途に活用しやすいかも
※MWAAの設定でログ出力設定をしてれば出る
中身こんな感じでAirflowで見たのと同じ
List Task Instanceの"Log Url"はクリックするとlocalhost:8080のローカルのリンクで見れなかった
CloudWatchでメトリクスも確認できます。
Browseの箇所でジョブやログのリストアップが出来て便利です。
provide_context
provide_contextは、ジョブトリガーの際に引数を渡す場合Trueに設定するらしい(らしい程度の知識です)
最初Trueに設定してて、Airflowの画面から手動実行(画面の再生ボタンみたいなアイコン)するとエラーしてはまってました。
ログを見るとなんかそれっぽいのが出てて解決できました。
start_date
ジョブ実行時間は、start_date、end_date、schedule_intervalというパラメータで制御します。基本的には start_date から end_date までの間、 schedule_interval の間隔で実行されます。ただクセのある挙動なので以下のブログ記事に詳しく書いてますのでご参考ください。
catchup
DAGプロパティでcatchup=Falseとすると、登録時以前に実行されるはずだったjobのキャッチアップを行わなくなり、登録時点以降からjobがスケジューリングされます。airflow.cfgでcatchup_by_default=Falseに設定すればタスクごとに設定しなくても、デフォルトでcatchupがFalseに設定されます。catchup=Trueとすれば過去のjobを実行できます。
MWAAでairflow.cfgの設定するには、マネコンから以下の"設定オプション"で設定が出来ます。
start_date, schedule_interval, catchup=false
ジョブを有効にした直後(GUIのON)、start_date+schedule_intervalのタイミングでジョブがスケジュールされます。catchup=falseで過去分はキャプチャしないので、例えば10:10に有効化すると9:00のジョブがスケジュールされます。それ以前は過去分なのでスケジュールされない。以後11:00に10:00のジョブがスケジュールされ12:00に・・と続きます。
今後
今回は今後のワークフローを拡張していくためシンプルなものとしました。メールでエラー通知や中途半端な出力ファイルの削除などエラー処理入れたり、変換したデータをRedshiftにロードしたり、AppFlowと繋げたりとか、他の処理を入れてワークフローを拡張していければと思います(多分)
Airflowでメール通知
参考
Airflow 1.10.12の公式
https://airflow.apache.org/docs/1.10.12/index.html
AWSのMWAAの公式
https://docs.aws.amazon.com/mwaa/latest/userguide/what-is-mwaa.html
re:invent2020のMWAAのセッションのクラメソさんの記事
https://dev.classmethod.jp/articles/reinvent2020-emb007-data-pipelines-with-amazon-managed-workflows-for-apache-airflow/
github
https://github.com/apache/airflow
Glueの使い方まとめ
https://qiita.com/pioho07/items/32f76a16cbf49f9f712f
これ使いたいな
https://pypi.org/project/apache-airflow-providers-amazon/#installation