LoginSignup
8
3

More than 5 years have passed since last update.

【Airflow】過去のジョブが実行されないようにする

Last updated at Posted at 2018-09-05

はじめに

 この記事では、インストール方法や初期設定などは紹介しません。Airflowのジョブの実行について困っている方向けに書いています。

バージョン

  • airflow 1.8.0
  • python 3.6.0

catchup=False にしても過去のジョブが実行される

  • 例えば、毎日18:00にbashを実行するようなジョブがあったとして
    1/2 19:00 ジョブ停止
    1/5 19:00 ジョブ再開
    としたら普通は1/6 18:00に次のジョブが実行されると考えますが、Airflowではcatchup=Falseにしてもジョブを再開してすぐに1/5 18:00の分のジョブが実行されるようにできています。また、1/5 17:00にジョブを再開すると1/4 18:00の分のジョブが実行されます。つまり、1つ前のジョブが実行されてしまうということです...

対処方法

  • lib/python3.6/site-packages/airflow/jobs.pyの809行目あたりを書き換える。
書き換え前
@provide_session
    def create_dag_run(self, dag, session=None):
        """
        This method checks whether a new DagRun needs to be created
        for a DAG based on scheduling interval
        Returns DagRun if one is scheduled. Otherwise returns None.
        """
       ...
            next_run_date = None
            if not last_scheduled_run:
                # First run
                task_start_dates = [t.start_date for t in dag.tasks]
                if task_start_dates:
                    next_run_date = dag.normalize_schedule(min(task_start_dates))
                    self.logger.debug("Next run date based on tasks {}"
                                      .format(next_run_date))
            else:
                next_run_date = dag.following_schedule(last_scheduled_run)

            # make sure backfills are also considered
            last_run = dag.get_last_dagrun(session=session)
       ...
書き換え後
@provide_session
    def create_dag_run(self, dag, session=None):
        """
        This method checks whether a new DagRun needs to be created
        for a DAG based on scheduling interval
        Returns DagRun if one is scheduled. Otherwise returns None.
        """
       ...
            next_run_date = None
            task_start_dates = [t.start_date for t in dag.tasks]
            if task_start_dates:
                next_run_date = dag.normalize_schedule(min(task_start_dates))
                self.logger.debug("Next run date based on tasks {}"
                                  .format(next_run_date))

            # make sure backfills are also considered
            last_run = dag.get_last_dagrun(session=session)

       ...

これで意図しないジョブの実行を防ぐことができます。

サンプルコード

  • example.shを毎日18:00に実行するジョブを定義します。
  • bashコマンドを実行したいときはBashOperatorを使います。
airflow/dag/test.py
import airflow
from airflow import utils
from airflow.operators.bash_operator import BashOperator
from airflow.models import DAG
from datetime import timedelta,datetime



default_args = {
    'owner': 'airflow',
    'start_date': datetime.now() - timedelta(days=1)
}

dag = DAG(
    dag_id='test',
    default_args=default_args,
    schedule_interval='0 18 * * *',
    catchup=False
    )

task1 = BashOperator(
    task_id='task1',
    bash_command='bash example.sh ',
    dag=dag)

  • 重要
    • 毎日実行するタスクなら、start_dateはちょうど24時間前になるようにdatetime.now() - timedelta(days=1)にする。
    • 毎週実行するタスクなら、ちょうど1週間前datetime.now() - timedelta(days=7)を指定する。

実行

$ airflow webserver -p 8080 &
$ airflow scheduler &
8
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
8
3