LoginSignup
4
1

More than 3 years have passed since last update.

AirflowでDAG間の依存関係の作成方法のまとめ

Last updated at Posted at 2020-06-16

AirflowではDAG同士の依存関係を定義する方法が2つある。
1つは TriggerDagRunOperator を使う方法と、 ExternalTaskSensor を使う方法だ。
どういう場合にどちらを使うべきか個人的に思うところをまとめる。

TriggerDagRunOperator を使う場合

DAG_A >> DAG_B のように DAGの依存関係が1対1 である場合。

DAG_AがDAG_Bを発火させるのでシンプルでわかりやすい。
ただし、DAG_Aが終了した直後にDAG_Bが発火してしまうので、DAG_Bの発火時刻を指定したい場合には使えない。

triggering.py
from datetime import timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dagrun_operator import TriggerDagRunOperator

default_args = {
    "owner": "airflow",
    "start_date": airflow.utils.dates.days_ago(1),
}

dag = DAG("triggering", default_args=default_args, catchup=False, schedule_interval="0 0 * * *")

t1 = BashOperator(
    task_id='t1',
    bash_command='echo 1',
    dag=dag
)

trigger = TriggerDagRunOperator(
    task_id="trigger",
    trigger_dag_id="second_dag", 
    dag=dag,
)

task1 >> trigger
triggered.py
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago

default_args = {
    "owner": "airflow",
    "start_date": airflow.utils.dates.days_ago(1),
}

dag = DAG( "second_dag", default_args=default_args, schedule_interval=None)

bash_task = BashOperator(
    task_id="bash_task",
    bash_command='echo "success trigger"',
    schedule_interval=None,
    dag=dag,
)

ExternalTaskSensor を使う場合

[DAG_A, DAG_B] >> DAG_C のように DAGが複数DAGに依存する場合。
もしくは、 DAG_Cの発火時刻を指定したい場合

ExternalTaskSensorはpollingしながらDAG_AとDAG_Bが終了したかをチェックする。
polling中はワーカーは動き続けてしまうので、出来るだけTriggerDagRunOperatorを使う方がいいと思う。

ExternalTaskSensorの注意点

ExternalTaskSensorはスケジューリングに気をつけないと動作しない。
この辺りの詳細とサンプルは以前書いたのでそちらを参照↓

AirflowでExternalTaskSensorを使う時に気をつけること

4
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
1