はじめに
この記事では、インストール方法や初期設定などは紹介しません。Airflowのジョブの実行について困っている方向けに書いています。
バージョン
- airflow 1.8.0
- python 3.6.0
catchup=False にしても過去のジョブが実行される
- 例えば、毎日18:00にbashを実行するようなジョブがあったとして
1/2 19:00 ジョブ停止
1/5 19:00 ジョブ再開
としたら普通は1/6 18:00に次のジョブが実行されると考えますが、Airflowではcatchup=Falseにしてもジョブを再開してすぐに1/5 18:00の分のジョブが実行されるようにできています。また、1/5 17:00にジョブを再開すると1/4 18:00の分のジョブが実行されます。つまり、1つ前のジョブが実行されてしまうということです...
対処方法
-
lib/python3.6/site-packages/airflow/jobs.py
の809行目あたりを書き換える。
書き換え前
@provide_session
def create_dag_run(self, dag, session=None):
"""
This method checks whether a new DagRun needs to be created
for a DAG based on scheduling interval
Returns DagRun if one is scheduled. Otherwise returns None.
"""
...
next_run_date = None
if not last_scheduled_run:
# First run
task_start_dates = [t.start_date for t in dag.tasks]
if task_start_dates:
next_run_date = dag.normalize_schedule(min(task_start_dates))
self.logger.debug("Next run date based on tasks {}"
.format(next_run_date))
else:
next_run_date = dag.following_schedule(last_scheduled_run)
# make sure backfills are also considered
last_run = dag.get_last_dagrun(session=session)
...
書き換え後
@provide_session
def create_dag_run(self, dag, session=None):
"""
This method checks whether a new DagRun needs to be created
for a DAG based on scheduling interval
Returns DagRun if one is scheduled. Otherwise returns None.
"""
...
next_run_date = None
task_start_dates = [t.start_date for t in dag.tasks]
if task_start_dates:
next_run_date = dag.normalize_schedule(min(task_start_dates))
self.logger.debug("Next run date based on tasks {}"
.format(next_run_date))
# make sure backfills are also considered
last_run = dag.get_last_dagrun(session=session)
...
これで意図しないジョブの実行を防ぐことができます。
サンプルコード
- example.shを毎日18:00に実行するジョブを定義します。
- bashコマンドを実行したいときは
BashOperator
を使います。
airflow/dag/test.py
import airflow
from airflow import utils
from airflow.operators.bash_operator import BashOperator
from airflow.models import DAG
from datetime import timedelta,datetime
default_args = {
'owner': 'airflow',
'start_date': datetime.now() - timedelta(days=1)
}
dag = DAG(
dag_id='test',
default_args=default_args,
schedule_interval='0 18 * * *',
catchup=False
)
task1 = BashOperator(
task_id='task1',
bash_command='bash example.sh ',
dag=dag)
-
重要
- 毎日実行するタスクなら、start_dateはちょうど24時間前になるように
datetime.now() - timedelta(days=1)
にする。 - 毎週実行するタスクなら、ちょうど1週間前
datetime.now() - timedelta(days=7)
を指定する。
- 毎日実行するタスクなら、start_dateはちょうど24時間前になるように
実行
$ airflow webserver -p 8080 &
$ airflow scheduler &