6
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

AirflowでSensorOperator使うときのexecution_timeout, retries, timeout, poke_intervalの使い方

Posted at

TL;DR

  • BaseOperator.execution_timeoutSensorOperator.timeout も同様の挙動をする
    • 汎用的な BaseOperator.execution_timeout を使うことがおすすめ
  • タスクが失敗(タイムアウト/例外)になった場合に、 retry_delay の間隔で retries の回数再実行される
  • SensorOperator では、処理がpassするまで poke_interval の間隔で処理が何度も実行される
    • 処理が passしない ことと、処理が 失敗する ことを分けて考える必要がある
      • 処理がpassしない -> 処理の失敗ではない。poke_interval 毎に処理が実行されるだけ
      • 処理が例外/タイムアウト -> 処理の失敗。retryの対象となる

背景

BaseOperator のコンストラクタ引数には execution_timeoutretries がある。これは、タスクのタイムアウト時間と再実行回数を指定できる。

一方、 xxxSensorOperator のコンストラクタ引数には timeoutpoke_inverval が用意されている。

  • execution_timeouttimeout の違いは何か
  • SensorOperatorの処理がうまくいかなかった場合に、poke_intervalretries の違いは?
    • poke_intervalretry_delay の違いは?

など不明が多いのでまとめる。

1. BaseOperatorの execution_timeout と retriesの動きを確認

シンプルにexecution_timeout時刻でタスクがfailとなり、retriesの回数再実行される。想定通り。


import airflow
from datetime import timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
import time

dag = DAG("base_dag")

def sleep_py():
    print('sleep')
    time.sleep(30)
    return 'success'

sp = PythonOperator(
    task_id='sleep_py',
    python_callable=sleep_py,
    execution_timeout=timedelta(seconds=3),
    retries=3,
    retry_delay=timedelta(seconds=1),
    start_date=airflow.utils.dates.days_ago(1),
    dag=dag,
)

# "sleep" が3回出力される

2. SensorOperator(return false)で timeout と poke_interval の動きを確認

poke_intervalの間隔で処理が何度も実行され、timeoutの秒数経過するとタスクがfailとなる。こちらも想定通り。

import airflow
from airflow.models import DAG
from airflow.utils.dates import days_ago
from datetime import timedelta
from airflow.contrib.sensors.python_sensor import PythonSensor

def false_py():
    print("false_py")
    return False

dag = DAG("sensor_test", schedule_interval="@daily")

sensor = PythonSensor(
        task_id='sensor_4',
        poke_interval=9,
        timeout=60,
        python_callable=false_py,
        start_date=airflow.utils.dates.days_ago(1),
        dag=dag
        )

# false_pyが5回出力される
# false_pyがFalseを返すためSensorの処理がパスしないから(Trueでパス)

3. execution_timeout と timeout を同時に使うとどうなるか

指定秒数が短い方が有効となる。以下の場合だと、短いexecution_timeoutの30秒でタイムアウトとなる。


import airflow
from airflow.models import DAG
from airflow.utils.dates import days_ago
from datetime import timedelta
from airflow.contrib.sensors.python_sensor import PythonSensor

def false_py():
    print("false_py")
    return False

dag = DAG("sensor_test", schedule_interval="@daily")

sensor = PythonSensor(
        task_id='sensor_5',
        poke_interval=9,
        timeout=60,
        execution_timeout=timedelta(seconds=30),
        python_callable=false_py,
        start_date=airflow.utils.dates.days_ago(1),
        dag=dag
        )
# false_py が2回出力される

4. SensorOperator(return false)に execution_timeout, poke_interval, retries を指定するとどうなるか

poke_intervalの間隔で処理が実行され、execution_timeoutの秒数経過するとタスクfailとなり、retriesで指定された回数再実行される。


import airflow
from airflow.models import DAG
from airflow.utils.dates import days_ago
from datetime import timedelta
from airflow.contrib.sensors.python_sensor import PythonSensor

def false_py():
    print("false_py")
    return False

dag = DAG("sensor_test", schedule_interval="@daily")

sensor = PythonSensor(
        task_id='sensor_7',
        poke_interval=9,
        execution_timeout=timedelta(seconds=60),
        python_callable=false_py,
        start_date=airflow.utils.dates.days_ago(1),
        retries=3,
        retry_delay=timedelta(seconds=1),
        dag=dag
        )

# false_py が5回出力される ← 3回実行される

5. SensorOperatorに poke_interval と retries が使われている場合に、例外処理するとどうなるか

初回の処理実行で例外となり、retriesの回数だけタスクが再実行されて終わる。


import airflow
from airflow.models import DAG
from airflow.utils.dates import days_ago
from datetime import timedelta
from airflow.contrib.sensors.python_sensor import PythonSensor

def fail_py():
    1/0
    print("fail_py")
    return False

dag = DAG("sensor_test", schedule_interval="@daily")

sensor = PythonSensor(
        task_id='sensor_7',
        poke_interval=10,
        execution_timeout=timedelta(seconds=60),
        python_callable=fail_py,
        start_date=airflow.utils.dates.days_ago(1),
        retries=3,
        retry_delay=timedelta(seconds=1),
        dag=dag
        )

# 例外が3回飛ぶ
6
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
6
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?