TL;DR
-
BaseOperator.execution_timeoutもSensorOperator.timeoutも同様の挙動をする- 汎用的な
BaseOperator.execution_timeoutを使うことがおすすめ
- 汎用的な
- タスクが失敗(タイムアウト/例外)になった場合に、
retry_delayの間隔でretriesの回数再実行される -
SensorOperatorでは、処理がpassするまでpoke_intervalの間隔で処理が何度も実行される- 処理が
passしないことと、処理が失敗することを分けて考える必要がある- 処理がpassしない -> 処理の失敗ではない。
poke_interval毎に処理が実行されるだけ - 処理が例外/タイムアウト -> 処理の失敗。retryの対象となる
- 処理がpassしない -> 処理の失敗ではない。
- 処理が
背景
BaseOperator のコンストラクタ引数には execution_timeout と retries がある。これは、タスクのタイムアウト時間と再実行回数を指定できる。
一方、 xxxSensorOperator のコンストラクタ引数には timeout とpoke_inverval が用意されている。
-
execution_timeoutとtimeoutの違いは何か - SensorOperatorの処理がうまくいかなかった場合に、
poke_intervalとretriesの違いは?-
poke_intervalとretry_delayの違いは?
-
など不明が多いのでまとめる。
1. BaseOperatorの execution_timeout と retriesの動きを確認
シンプルにexecution_timeout時刻でタスクがfailとなり、retriesの回数再実行される。想定通り。
import airflow
from datetime import timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
import time
dag = DAG("base_dag")
def sleep_py():
print('sleep')
time.sleep(30)
return 'success'
sp = PythonOperator(
task_id='sleep_py',
python_callable=sleep_py,
execution_timeout=timedelta(seconds=3),
retries=3,
retry_delay=timedelta(seconds=1),
start_date=airflow.utils.dates.days_ago(1),
dag=dag,
)
# "sleep" が3回出力される
2. SensorOperator(return false)で timeout と poke_interval の動きを確認
poke_intervalの間隔で処理が何度も実行され、timeoutの秒数経過するとタスクがfailとなる。こちらも想定通り。
import airflow
from airflow.models import DAG
from airflow.utils.dates import days_ago
from datetime import timedelta
from airflow.contrib.sensors.python_sensor import PythonSensor
def false_py():
print("false_py")
return False
dag = DAG("sensor_test", schedule_interval="@daily")
sensor = PythonSensor(
task_id='sensor_4',
poke_interval=9,
timeout=60,
python_callable=false_py,
start_date=airflow.utils.dates.days_ago(1),
dag=dag
)
# false_pyが5回出力される
# false_pyがFalseを返すためSensorの処理がパスしないから(Trueでパス)
3. execution_timeout と timeout を同時に使うとどうなるか
指定秒数が短い方が有効となる。以下の場合だと、短いexecution_timeoutの30秒でタイムアウトとなる。
import airflow
from airflow.models import DAG
from airflow.utils.dates import days_ago
from datetime import timedelta
from airflow.contrib.sensors.python_sensor import PythonSensor
def false_py():
print("false_py")
return False
dag = DAG("sensor_test", schedule_interval="@daily")
sensor = PythonSensor(
task_id='sensor_5',
poke_interval=9,
timeout=60,
execution_timeout=timedelta(seconds=30),
python_callable=false_py,
start_date=airflow.utils.dates.days_ago(1),
dag=dag
)
# false_py が2回出力される
4. SensorOperator(return false)に execution_timeout, poke_interval, retries を指定するとどうなるか
poke_intervalの間隔で処理が実行され、execution_timeoutの秒数経過するとタスクfailとなり、retriesで指定された回数再実行される。
import airflow
from airflow.models import DAG
from airflow.utils.dates import days_ago
from datetime import timedelta
from airflow.contrib.sensors.python_sensor import PythonSensor
def false_py():
print("false_py")
return False
dag = DAG("sensor_test", schedule_interval="@daily")
sensor = PythonSensor(
task_id='sensor_7',
poke_interval=9,
execution_timeout=timedelta(seconds=60),
python_callable=false_py,
start_date=airflow.utils.dates.days_ago(1),
retries=3,
retry_delay=timedelta(seconds=1),
dag=dag
)
# false_py が5回出力される ← 3回実行される
5. SensorOperatorに poke_interval と retries が使われている場合に、例外処理するとどうなるか
初回の処理実行で例外となり、retriesの回数だけタスクが再実行されて終わる。
import airflow
from airflow.models import DAG
from airflow.utils.dates import days_ago
from datetime import timedelta
from airflow.contrib.sensors.python_sensor import PythonSensor
def fail_py():
1/0
print("fail_py")
return False
dag = DAG("sensor_test", schedule_interval="@daily")
sensor = PythonSensor(
task_id='sensor_7',
poke_interval=10,
execution_timeout=timedelta(seconds=60),
python_callable=fail_py,
start_date=airflow.utils.dates.days_ago(1),
retries=3,
retry_delay=timedelta(seconds=1),
dag=dag
)
# 例外が3回飛ぶ