AirflowではDAG同士の依存関係を定義する方法が2つある。
1つは TriggerDagRunOperator を使う方法と、 ExternalTaskSensor を使う方法だ。
どういう場合にどちらを使うべきか個人的に思うところをまとめる。
TriggerDagRunOperator を使う場合
DAG_A >> DAG_B のように DAGの依存関係が1対1 である場合。
DAG_AがDAG_Bを発火させるのでシンプルでわかりやすい。
ただし、DAG_Aが終了した直後にDAG_Bが発火してしまうので、DAG_Bの発火時刻を指定したい場合には使えない。
triggering.py
from datetime import timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dagrun_operator import TriggerDagRunOperator
default_args = {
"owner": "airflow",
"start_date": airflow.utils.dates.days_ago(1),
}
dag = DAG("triggering", default_args=default_args, catchup=False, schedule_interval="0 0 * * *")
t1 = BashOperator(
task_id='t1',
bash_command='echo 1',
dag=dag
)
trigger = TriggerDagRunOperator(
task_id="trigger",
trigger_dag_id="second_dag",
dag=dag,
)
task1 >> trigger
triggered.py
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
default_args = {
"owner": "airflow",
"start_date": airflow.utils.dates.days_ago(1),
}
dag = DAG( "second_dag", default_args=default_args, schedule_interval=None)
bash_task = BashOperator(
task_id="bash_task",
bash_command='echo "success trigger"',
schedule_interval=None,
dag=dag,
)
ExternalTaskSensor を使う場合
[DAG_A, DAG_B] >> DAG_C のように DAGが複数DAGに依存する場合。
もしくは、 DAG_Cの発火時刻を指定したい場合。
ExternalTaskSensorはpollingしながらDAG_AとDAG_Bが終了したかをチェックする。
polling中はワーカーは動き続けてしまうので、出来るだけTriggerDagRunOperatorを使う方がいいと思う。
ExternalTaskSensorの注意点
ExternalTaskSensorはスケジューリングに気をつけないと動作しない。
この辺りの詳細とサンプルは以前書いたのでそちらを参照↓