追記(2020/06/16)
こちらに最新版のまとめを書きました↓
AirflowでDAG間の依存関係の作成方法のまとめ
==追記ここまで==
背景
DAG_A と DAG_B がある場合に、DAG_A が正常終了した後に、DAG_Bが実行されるような依存関係のあるDAGを作成したい。
サンプルコード
triggering.py
from datetime import timedelta
from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.operators.dagrun_operator import TriggerDagRunOperator
default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": airflow.utils.dates.days_ago(1),
"retry_delay": timedelta(minutes=5)
}
dag = DAG("first_dag", default_args=default_args, catchup=False, schedule_interval="0 17 * * *")
task1 = BigQueryOperator(
task_id="task1",
sql="./sql/task1.sql",
use_legacy_sql=False,
destination_dataset_table="pj.dataset.table_name",
write_disposition="WRITE_TRUNCATE",
allow_large_results=True,
dag=dag
)
trigger = TriggerDagRunOperator(
task_id="trigger",
trigger_dag_id="second_dag",
dag=dag,
)
task1 >> trigger
triggered.py
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": airflow.utils.dates.days_ago(1),
"retries": 3,
"retry_delay": timedelta(minutes=5),
}
dag = DAG( "second_dag", default_args=default_args, schedule_interval=None)
bash_task = BashOperator(
task_id="bash_task",
bash_command='echo "success trigger"',
schedule_interval=None,
dag=dag,
)
trigger.py
の task1
実行後、trigger
タスクで triggered.py
の second_dag
を実行している。
DAGをトリガーするには TriggerDagRunOperator を使う。