0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

MWAAでAWS Batchを非同期実行し、完了をSensorで監視する

Posted at

はじめに

こちらの素晴らしいブログに、PythonSensorを使っての非同期実行の方法が載っていました。

今回はそれをAWS Batchで試してみます。

ちなみにZombie Taskとは、AirflowのタスクがRunning状態だが生存が確認できない(Heartbeatが確認できない)状態でです。まさに生きたまま死んでいるゾンビ状態。

公式ドキュメントでは以下に説明が載っています。Zombie Taskと判定するコードまで載っていて、わかりやすいです。

前提条件

  • Airflowのバージョンはv2.8.1
    • 2024年10月5日現在の最新バージョンはv2.10.1
  • 既にMWAAのインスタンスは作成済み
  • 実行ロールにはAdministratorAccessポリシーを付与している
    • 実運用の際は最小権限を付与する必要があるが、今回は実験のため

やってみる

LLMに聞きながら、以下のコードを作成しました。

from airflow import DAG
from airflow.providers.amazon.aws.operators.batch import BatchOperator
from airflow.providers.amazon.aws.sensors.batch import BatchSensor
from datetime import datetime, timedelta

default_args = {
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

with DAG('aws_batch_dag',
         default_args=default_args,
         description='Run AWS Batch Job',
         schedule_interval=None,
         start_date=datetime(2023, 5, 20),
         catchup=False
         ) as dag:

    # AWS Batch ジョブ定義と実行
    batch_job = BatchOperator(
        task_id='run_batch_job',
        job_name='airflow-batch-job',
        job_queue='<自身のジョブキューの名前>',
        job_definition='<自身のジョブ定義の名前>',
        aws_conn_id='aws_default'
    )

    # BatchOperatorはデフォルトだと待機するので、Falseにして非同期実行にする
    batch_job.wait_for_completion = False

    # AWS Batch ジョブのステータスを監視
    batch_sensor = BatchSensor(
        task_id='watch_batch_job',
        job_id=batch_job.output,
        aws_conn_id='aws_default'
    )

    batch_job >> batch_sensor

実行してすぐの状態がこちら。
1つ目のTaskはAWS Batchを実行してその結果を待たずにsuccessとなっています。2つ目のTaskのBatchSensorでAWS Batchの状態を監視しています。

image.png

最終的には、どちらもsuccessとなって完了しました。

image.png

このとき、BatchSensorのログとAWS Batchのバッチ詳細を見比べてみます。
右の画像がバッチ詳細で、04:41:51に完了しています。そして左のログを見ると、04:41:53に完了しています。バッチが完了したことを検知して、Taskが完了しているのが分かりますね。

image.png

ちなみに、DAGファイルの以下の部分をコメントアウトして実行してみると、非同期処理にならずBatchOperatorがAWS Batchの実行完了まで待ってしまうので注意してください。

    # BatchOperatorはデフォルトだと待機するので、Falseにして非同期実行にする
    batch_job.wait_for_completion = False

おわりに

Sensorはうまく使うとDAGで実施できる処理の幅が広がりますね。自分で実装すると少し面倒なところなので、うまく使っていきたいところです。
非同期処理にすることのメリットは、再掲の以下のブログにわかりやすく記載がありますので読んでみてください。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?