airflow

Airflow - タスク登録時に、start_date に過去の日付を指定した場合でも、過去日付分の job が実行されないようにする

More than 1 year has passed since last update.

例えばこんなタスクを書いたとして、

from airflow import DAG
from datetime import datetime
from airflow.operators import BashOperator

dag = DAG('schedule_test', start_date=datetime(2017, 9, 10, 12, 0, 0), schedule_interval='*/5 * * * *')

task = BashOperator(task_id='date', bash_command='echo "time: $(/bin/date)" >> /tmp/output.log ', dag=dag)

これを、start_date に指定した時刻よりに Airflow に登録すると、start_date から現在時刻までの、過去の日付に実行されるはずだった job がすべて登録され、その場で実行されてしまいます。
たとえば、2017/9/11 12:00:00に登録した場合、2017/9/10 12:00:00からの24時間の間に実行されるはずだった288個(24*60/5で288)の job がその場で実行されてしまいます。

これを避けるためには、DAG のプロパティでcatchup=Falseを指定します。こうすると、登録時点以前に実行されるはずだった job のキャッチアップを行わないようになり、登録した時点以降から job がスケジューリングされるようになります。

from airflow import DAG
from datetime import datetime
from airflow.operators import BashOperator

dag = DAG('schedule_test', start_date=datetime(2017, 9, 10, 12, 0, 0), schedule_interval='*/5 * * * *', catchup=False)

task = BashOperator(task_id='date', bash_command='echo "time: $(/bin/date)" >> /tmp/output.log ', dag=dag)

あるいは、airflow.cfgcatchup_by_default=Falseと設定すると、いちいちタスクの DAG で指定しなくても、デフォルトでcatchupがFalseに設定されます。
この場合、逆に過去の job を実行したい場合は、DAG のプロパティで明示的にcatchup=TrueとすればOKです。