#Dags間の連携について
スケジュールを設定していくと、DAGS間でAが終わったらBを動かしたいなど、依存関係を設定たくなる時があると思います。
そのような時の書き方について記載します。
ExternalTaskSensor
を使えばできるのですが、ポイントが2つあって、
a. secondDagで、firstDagのTaskを指定する
b. schedule_interval
は同じ時間に起動するようにする
が必要となります。
実際に作って起動させてみるとわかるのですが、secondDagは待ち状態の形で起動します。
そして同じ時間に起動したDagの指定したTaskが終わるのを待っています。
テストする時はたいてい手動で動かすと思うのですが、そうするとみごとに引っかかって動かないです。
スケジュールの1番目
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
default_args = {'owner': 'airflow', 'start_date': datetime(2018, 9, 21)}
dag = DAG('firstDag', default_args=default_args,schedule_interval='0 1 * * *')
DummyOperator(
task_id='Call_secondDag'
, dag=dag
)
スケジュールの2番目
from airflow import DAG
from airflow.operators.sensors import ExternalTaskSensor
default_args = {'owner': 'airflow', 'start_date': datetime(2018, 9, 21)}
dag = DAG('secondDag', default_args=default_args,schedule_interval='0 1 * * *')
ExternalTaskSensor(
task_id='Start'
, external_dag_id='firstDag'
, external_task_id='Call_secondDag'
, dag=dag
)