2行で
- デフォルト設定では
ExternalTaskSensor
のDAGのschedulerは、external_dag_id
のものと全く同じ設定とすること -
ExternalTaskSensor
のDAGのschedulerを変えたい場合は、execution_delta
を設定する
背景
Airflowは1.10.6。
ExternalTaskSensorを使って別のDAGの終了を確認してからタスクを実行するようなDAGを作りたい。
適当に設定するとなんかうまく動かなくて苦労したのでメモ。
動くサンプル1
test1.py
import airflow
from datetime import timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": airflow.utils.dates.days_ago(1),
"execution_timeout": timedelta(minutes=30),
"retries": 3,
"retry_delay": timedelta(minutes=5),
}
dag = DAG(
"external_dag",
default_args=default_args,
catchup=False,
schedule_interval="0 2 * * *",
)
t1 = BashOperator(task_id='task_1', bash_command="echo 'aaa'", dag=dag)
t2 = BashOperator(task_id='task_2', bash_command="echo 'bbb'", dag=dag)
t1 >> t2
test_2.py
import airflow
from datetime import timedelta
from airflow import DAG
from airflow.sensors.external_task_sensor import ExternalTaskSensor
default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": airflow.utils.dates.days_ago(1),
"execution_timeout": timedelta(minutes=30),
"retries": 3,
"retry_delay": timedelta(minutes=5),
}
dag = DAG(
"sensor_dag",
default_args=default_args,
catchup=False,
schedule_interval="0 2 * * *",
)
ExternalTaskSensor(
task_id='sensor_1',
external_dag_id='external_dag',
external_task_id=None,
dag=dag
)
external_dag のschedulerと、ExternalTaskSensor の scheduler の設定が同じになっているので動く。1分でも違うと動かないで注意。
external_dag と ExternalTaskSensor で異なるschedulerを設定したい場合
external_dag と ExternalTaskSensor の時間をずらしたい場合も出てくる。その場合は、 ExternalTaskSensor の execution_delta
引数を使う。
例えば、external_dagの schedulerが 0 2 * * *
で、 ExternalTaskSensor が 5 2 * * *
にしたい場合。つまり、5分後にTaskSensorを起動したい場合。
ExternalTaskSensorを以下のように設定する。
test_2.py
# 略
dag = DAG(
"test_2",
default_args=default_args,
catchup=False,
schedule_interval="5 2 * * *",
execution_delta=timedelta(minutes=5),
)
ExternalTaskSensor(
task_id='task_1',
external_dag_id='test_1',
external_task_id=None,
execution_delta=timedelta(minutes=9),
dag=dag
)
差分の5分のtimedelta(minutes=5)
を引数に渡すと動く。気をつけないといけないのが、ぴったり5分じゃないとダメ。1時間とかでも動くだろと思ったけどダメだった。