はじめに
こちらの素晴らしいブログに、PythonSensorを使っての非同期実行の方法が載っていました。
今回はそれをAWS Batchで試してみます。
ちなみにZombie Taskとは、AirflowのタスクがRunning状態だが生存が確認できない(Heartbeatが確認できない)状態でです。まさに生きたまま死んでいるゾンビ状態。
公式ドキュメントでは以下に説明が載っています。Zombie Taskと判定するコードまで載っていて、わかりやすいです。
前提条件
- Airflowのバージョンはv2.8.1
- 2024年10月5日現在の最新バージョンはv2.10.1
- 既にMWAAのインスタンスは作成済み
- 実行ロールには
AdministratorAccess
ポリシーを付与している- 実運用の際は最小権限を付与する必要があるが、今回は実験のため
やってみる
LLMに聞きながら、以下のコードを作成しました。
from airflow import DAG
from airflow.providers.amazon.aws.operators.batch import BatchOperator
from airflow.providers.amazon.aws.sensors.batch import BatchSensor
from datetime import datetime, timedelta
default_args = {
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
with DAG('aws_batch_dag',
default_args=default_args,
description='Run AWS Batch Job',
schedule_interval=None,
start_date=datetime(2023, 5, 20),
catchup=False
) as dag:
# AWS Batch ジョブ定義と実行
batch_job = BatchOperator(
task_id='run_batch_job',
job_name='airflow-batch-job',
job_queue='<自身のジョブキューの名前>',
job_definition='<自身のジョブ定義の名前>',
aws_conn_id='aws_default'
)
# BatchOperatorはデフォルトだと待機するので、Falseにして非同期実行にする
batch_job.wait_for_completion = False
# AWS Batch ジョブのステータスを監視
batch_sensor = BatchSensor(
task_id='watch_batch_job',
job_id=batch_job.output,
aws_conn_id='aws_default'
)
batch_job >> batch_sensor
実行してすぐの状態がこちら。
1つ目のTaskはAWS Batchを実行してその結果を待たずにsuccessとなっています。2つ目のTaskのBatchSensor
でAWS Batchの状態を監視しています。
最終的には、どちらもsuccessとなって完了しました。
このとき、BatchSensor
のログとAWS Batchのバッチ詳細を見比べてみます。
右の画像がバッチ詳細で、04:41:51に完了しています。そして左のログを見ると、04:41:53に完了しています。バッチが完了したことを検知して、Taskが完了しているのが分かりますね。
ちなみに、DAGファイルの以下の部分をコメントアウトして実行してみると、非同期処理にならずBatchOperator
がAWS Batchの実行完了まで待ってしまうので注意してください。
# BatchOperatorはデフォルトだと待機するので、Falseにして非同期実行にする
batch_job.wait_for_completion = False
おわりに
Sensorはうまく使うとDAGで実施できる処理の幅が広がりますね。自分で実装すると少し面倒なところなので、うまく使っていきたいところです。
非同期処理にすることのメリットは、再掲の以下のブログにわかりやすく記載がありますので読んでみてください。