LoginSignup
5
3

More than 3 years have passed since last update.

Managed Workflows for Apache Airflow (MWAA)でGlueを使ったジョブフロー

Last updated at Posted at 2020-12-03

はじめに

AWSのManaged AirflowサービスであるMWAAを試してみる

構築

MWAAの構築はクラメソさんの記事で済ませます
https://dev.classmethod.jp/articles/amazon-managed-workflows-for-apache-airflow-mwaa-ga/

流れ

以下の記事で行っているGlueジョブ実行とGlueクローラ実行をAirflowでやってみます。GlueジョブやS3バケットなどは既にあるものとして進めます。作成したい場合は以下のqiitaで再現は出来ると思いますので割愛します。
https://qiita.com/pioho07/items/523aec26ca5dc5bc9697

登場リソース

  • 入力ファイル:19行のcsvファイルでS3にアップ済
  • Glueジョブ名:se2_job1
  • Glueクローラー名:se2_out1

処理内容:

se2_job1 Glueジョブで、csvファイルをParquetに変換します。
se2_out1 Glueクローラで、出力されたParquetファイルからテーブルを作成します。
Airflowでは、se2_job1 -> se2_out1 の順番で実行するシンプルなワークフローです。

今回使うサンプルログファイル(19件)

csvlog.csv
deviceid,uuid,appid,country,year,month,day,hour
iphone,11111,1,JP,2017,12,14,12
android,11112,1,FR,2017,12,14,14
iphone,11113,9,FR,2017,12,16,21
iphone,11114,007,AUS,2017,12,17,18
other,11115,005,JP,2017,12,29,15
iphone,11116,001,JP,2017,12,15,11
pc,11118,001,FR,2017,12,01,01
pc,11117,009,FR,2017,12,02,18
iphone,11119,007,AUS,2017,11,21,14
other,11110,005,JP,2017,11,29,15
iphone,11121,001,JP,2017,11,11,12
android,11122,001,FR,2017,11,30,20
iphone,11123,009,FR,2017,11,14,14
iphone,11124,007,AUS,2017,12,17,14
iphone,11125,005,JP,2017,11,29,15
iphone,11126,001,JP,2017,12,19,08
android,11127,001,FR,2017,12,19,14
iphone,11128,009,FR,2017,12,09,04
iphone,11129,007,AUS,2017,11,30,14

ワークフロー作成

MWAAの"DAGフォルダ"で指定したS3パスに以下のDAGコードをアップロードします。
※テストなのでかなり緩いエラーハンドリングです。

demo-etl-job
import boto3
import time
import airflow
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator

args = {
    "owner": "airflow",
    "start_date": airflow.utils.dates.days_ago(1),
    "provide_context": False,
}

etl_job_name = "se2_job1"
etl_crwl_name = "se2_out1"

glue_client = boto3.client('glue')

def glue_etl_job():
    job = glue_client.start_job_run(JobName=etl_job_name)
    while True:
        status = glue_client.get_job_run(JobName=etl_job_name, RunId=job['JobRunId'])
        if status['JobRun']['JobRunState'] == 'SUCCEEDED':
            break
        time.sleep(60)

def glue_crawler_job():
    glue_client.start_crawler(Name=etl_crwl_name)
    while True:
        status = glue_client.get_crawler(Name=etl_crwl_name)
        if status['Crawler']['LastCrawl']['Status'] == 'SUCCEEDED':
            break
        time.sleep(60)


with DAG(
    dag_id="demo_etl_job",
    description="Simple glue DAG",
    default_args=args,
    schedule_interval="*/60 * * * *",
    catchup=False,
    tags=['demo']
) as dag:
    t1 = PythonOperator(task_id="glue_job_step2", python_callable=glue_etl_job)
    t2 = PythonOperator(task_id="glue_crawler_step", python_callable=glue_crawler_job)

    t1 >> t2

S3にアップすると、だいたい1分未満くらいでAirflowのUIに反映されます。

スクリーンショット 0002-12-03 10.26.31.png

ジョブは最初Pause状態なので、画面のon/offボタンをクリックしonにします。これによりジョブがアクティブになりスケジュールが行われます

スクリーンショット 0002-12-03 10.28.28.png

正常に終わるとこんな感じ

スクリーンショット 0002-12-03 10.29.12.png

DAG名の"demo_etl_job"をクリックするとこんな画面になります。

スクリーンショット 0002-12-03 10.30.17.png

Glue側もジョブとクローラが正常に終了してます

スクリーンショット 0002-12-03 20.49.38.png

いろいろ見てみる

スクリーンショット 0002-12-03 10.30.35.png

シンプルなワークフローなのでそんなに面白くない

スクリーンショット 0002-12-03 10.30.55.png

ログの確認

画面のジョブの中のタスク(glue_crawler_job)をクリックします。

スクリーンショット 0002-12-03 10.34.57.png

[View Log]をクリック

スクリーンショット 0002-12-03 10.35.05.png

ログが見れます。実行したタスクの見たいログがすぐ見れていいですね。他にも必要な機能が集約されたモノリスもいいもんですね。

スクリーンショット 0002-12-03 10.36.58.png

AWSらしく、同じ内容のログはCloudWatchLogsにも出ています。他の用途に活用しやすいかも
※MWAAの設定でログ出力設定をしてれば出る

スクリーンショット 0002-12-03 10.38.52.png

中身こんな感じでAirflowで見たのと同じ

スクリーンショット 0002-12-03 10.41.09.png

List Task Instanceの"Log Url"はクリックするとlocalhost:8080のローカルのリンクで見れなかった

スクリーンショット 0002-12-03 10.44.14.png

CloudWatchでメトリクスも確認できます。

スクリーンショット 0002-12-03 10.56.43.png

Browseの箇所でジョブやログのリストアップが出来て便利です。

スクリーンショット 0002-12-03 10.59.33.png

provide_context

provide_contextは、ジョブトリガーの際に引数を渡す場合Trueに設定するらしい(らしい程度の知識です)
最初Trueに設定してて、Airflowの画面から手動実行(画面の再生ボタンみたいなアイコン)するとエラーしてはまってました。

スクリーンショット 0002-12-03 11.21.21.png

ログを見るとなんかそれっぽいのが出てて解決できました。

スクリーンショット 0002-12-03 11.20.05.png
スクリーンショット 0002-12-03 11.19.55.png

start_date

ジョブ実行時間は、start_date、end_date、schedule_intervalというパラメータで制御します。基本的には start_date から end_date までの間、 schedule_interval の間隔で実行されます。ただクセのある挙動なので以下のブログ記事に詳しく書いてますのでご参考ください。

catchup

DAGプロパティでcatchup=Falseとすると、登録時以前に実行されるはずだったjobのキャッチアップを行わなくなり、登録時点以降からjobがスケジューリングされます。airflow.cfgでcatchup_by_default=Falseに設定すればタスクごとに設定しなくても、デフォルトでcatchupがFalseに設定されます。catchup=Trueとすれば過去のjobを実行できます。
MWAAでairflow.cfgの設定するには、マネコンから以下の"設定オプション"で設定が出来ます。

スクリーンショット 0002-12-03 9.19.55.png

start_date, schedule_interval, catchup=false

ジョブを有効にした直後(GUIのON)、start_date+schedule_intervalのタイミングでジョブがスケジュールされます。catchup=falseで過去分はキャプチャしないので、例えば10:10に有効化すると9:00のジョブがスケジュールされます。それ以前は過去分なのでスケジュールされない。以後11:00に10:00のジョブがスケジュールされ12:00に・・と続きます。

今後

今回は今後のワークフローを拡張していくためシンプルなものとしました。メールでエラー通知や中途半端な出力ファイルの削除などエラー処理入れたり、変換したデータをRedshiftにロードしたり、AppFlowと繋げたりとか、他の処理を入れてワークフローを拡張していければと思います(多分)

Airflowでメール通知

参考

Airflow 1.10.12の公式
https://airflow.apache.org/docs/1.10.12/index.html

AWSのMWAAの公式
https://docs.aws.amazon.com/mwaa/latest/userguide/what-is-mwaa.html

re:invent2020のMWAAのセッションのクラメソさんの記事
https://dev.classmethod.jp/articles/reinvent2020-emb007-data-pipelines-with-amazon-managed-workflows-for-apache-airflow/

github
https://github.com/apache/airflow

Glueの使い方まとめ
https://qiita.com/pioho07/items/32f76a16cbf49f9f712f

これ使いたいな
https://pypi.org/project/apache-airflow-providers-amazon/#installation

5
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
3