TL;DR
-
BaseOperator.execution_timeout
もSensorOperator.timeout
も同様の挙動をする- 汎用的な
BaseOperator.execution_timeout
を使うことがおすすめ
- 汎用的な
- タスクが失敗(タイムアウト/例外)になった場合に、
retry_delay
の間隔でretries
の回数再実行される -
SensorOperator
では、処理がpassするまでpoke_interval
の間隔で処理が何度も実行される- 処理が
passしない
ことと、処理が失敗する
ことを分けて考える必要がある- 処理がpassしない -> 処理の失敗ではない。
poke_interval
毎に処理が実行されるだけ - 処理が例外/タイムアウト -> 処理の失敗。retryの対象となる
- 処理がpassしない -> 処理の失敗ではない。
- 処理が
背景
BaseOperator
のコンストラクタ引数には execution_timeout
と retries
がある。これは、タスクのタイムアウト時間と再実行回数を指定できる。
一方、 xxxSensorOperator
のコンストラクタ引数には timeout
とpoke_inverval
が用意されている。
-
execution_timeout
とtimeout
の違いは何か - SensorOperatorの処理がうまくいかなかった場合に、
poke_interval
とretries
の違いは?-
poke_interval
とretry_delay
の違いは?
-
など不明が多いのでまとめる。
1. BaseOperatorの execution_timeout と retriesの動きを確認
シンプルにexecution_timeout時刻でタスクがfailとなり、retriesの回数再実行される。想定通り。
import airflow
from datetime import timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
import time
dag = DAG("base_dag")
def sleep_py():
print('sleep')
time.sleep(30)
return 'success'
sp = PythonOperator(
task_id='sleep_py',
python_callable=sleep_py,
execution_timeout=timedelta(seconds=3),
retries=3,
retry_delay=timedelta(seconds=1),
start_date=airflow.utils.dates.days_ago(1),
dag=dag,
)
# "sleep" が3回出力される
2. SensorOperator(return false)で timeout と poke_interval の動きを確認
poke_intervalの間隔で処理が何度も実行され、timeoutの秒数経過するとタスクがfailとなる。こちらも想定通り。
import airflow
from airflow.models import DAG
from airflow.utils.dates import days_ago
from datetime import timedelta
from airflow.contrib.sensors.python_sensor import PythonSensor
def false_py():
print("false_py")
return False
dag = DAG("sensor_test", schedule_interval="@daily")
sensor = PythonSensor(
task_id='sensor_4',
poke_interval=9,
timeout=60,
python_callable=false_py,
start_date=airflow.utils.dates.days_ago(1),
dag=dag
)
# false_pyが5回出力される
# false_pyがFalseを返すためSensorの処理がパスしないから(Trueでパス)
3. execution_timeout と timeout を同時に使うとどうなるか
指定秒数が短い方が有効となる。以下の場合だと、短いexecution_timeoutの30秒でタイムアウトとなる。
import airflow
from airflow.models import DAG
from airflow.utils.dates import days_ago
from datetime import timedelta
from airflow.contrib.sensors.python_sensor import PythonSensor
def false_py():
print("false_py")
return False
dag = DAG("sensor_test", schedule_interval="@daily")
sensor = PythonSensor(
task_id='sensor_5',
poke_interval=9,
timeout=60,
execution_timeout=timedelta(seconds=30),
python_callable=false_py,
start_date=airflow.utils.dates.days_ago(1),
dag=dag
)
# false_py が2回出力される
4. SensorOperator(return false)に execution_timeout, poke_interval, retries を指定するとどうなるか
poke_intervalの間隔で処理が実行され、execution_timeoutの秒数経過するとタスクfailとなり、retriesで指定された回数再実行される。
import airflow
from airflow.models import DAG
from airflow.utils.dates import days_ago
from datetime import timedelta
from airflow.contrib.sensors.python_sensor import PythonSensor
def false_py():
print("false_py")
return False
dag = DAG("sensor_test", schedule_interval="@daily")
sensor = PythonSensor(
task_id='sensor_7',
poke_interval=9,
execution_timeout=timedelta(seconds=60),
python_callable=false_py,
start_date=airflow.utils.dates.days_ago(1),
retries=3,
retry_delay=timedelta(seconds=1),
dag=dag
)
# false_py が5回出力される ← 3回実行される
5. SensorOperatorに poke_interval と retries が使われている場合に、例外処理するとどうなるか
初回の処理実行で例外となり、retriesの回数だけタスクが再実行されて終わる。
import airflow
from airflow.models import DAG
from airflow.utils.dates import days_ago
from datetime import timedelta
from airflow.contrib.sensors.python_sensor import PythonSensor
def fail_py():
1/0
print("fail_py")
return False
dag = DAG("sensor_test", schedule_interval="@daily")
sensor = PythonSensor(
task_id='sensor_7',
poke_interval=10,
execution_timeout=timedelta(seconds=60),
python_callable=fail_py,
start_date=airflow.utils.dates.days_ago(1),
retries=3,
retry_delay=timedelta(seconds=1),
dag=dag
)
# 例外が3回飛ぶ