3
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

AirflowでExternalTaskSensorを使う時に気をつけること

Last updated at Posted at 2020-06-04

2行で

  • デフォルト設定では ExternalTaskSensor のDAGのschedulerは、 external_dag_id のものと全く同じ設定とすること
  • ExternalTaskSensor のDAGのschedulerを変えたい場合は、 execution_delta を設定する

背景

Airflowは1.10.6。
ExternalTaskSensorを使って別のDAGの終了を確認してからタスクを実行するようなDAGを作りたい。
適当に設定するとなんかうまく動かなくて苦労したのでメモ。

動くサンプル1

test1.py

import airflow
from datetime import timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator

default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    "start_date": airflow.utils.dates.days_ago(1),
    "execution_timeout": timedelta(minutes=30),
    "retries": 3,
    "retry_delay": timedelta(minutes=5),
}

dag = DAG(
    "external_dag",
    default_args=default_args,
    catchup=False,
    schedule_interval="0 2 * * *",
)

t1 = BashOperator(task_id='task_1', bash_command="echo 'aaa'", dag=dag)
t2 = BashOperator(task_id='task_2', bash_command="echo 'bbb'", dag=dag)

t1 >> t2

test_2.py
import airflow
from datetime import timedelta
from airflow import DAG
from airflow.sensors.external_task_sensor import ExternalTaskSensor

default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    "start_date": airflow.utils.dates.days_ago(1),
    "execution_timeout": timedelta(minutes=30),
    "retries": 3,
    "retry_delay": timedelta(minutes=5),
}

dag = DAG(
    "sensor_dag",
    default_args=default_args,
    catchup=False,
    schedule_interval="0 2 * * *",
)

ExternalTaskSensor(
    task_id='sensor_1',
    external_dag_id='external_dag',
    external_task_id=None,
    dag=dag
)

external_dag のschedulerと、ExternalTaskSensor の scheduler の設定が同じになっているので動く。1分でも違うと動かないで注意。

external_dag と ExternalTaskSensor で異なるschedulerを設定したい場合

external_dag と ExternalTaskSensor の時間をずらしたい場合も出てくる。その場合は、 ExternalTaskSensor の execution_delta 引数を使う。

例えば、external_dagの schedulerが 0 2 * * * で、 ExternalTaskSensor が 5 2 * * * にしたい場合。つまり、5分後にTaskSensorを起動したい場合。

ExternalTaskSensorを以下のように設定する。

test_2.py
# 略

dag = DAG(
    "test_2",
    default_args=default_args,
    catchup=False,
    schedule_interval="5 2 * * *",
    execution_delta=timedelta(minutes=5),
)

ExternalTaskSensor(
    task_id='task_1',
    external_dag_id='test_1',
    external_task_id=None,
    execution_delta=timedelta(minutes=9),
    dag=dag
)

差分の5分のtimedelta(minutes=5) を引数に渡すと動く。気をつけないといけないのが、ぴったり5分じゃないとダメ。1時間とかでも動くだろと思ったけどダメだった。

参考

公式doc
Airflow で異なる DAG 間の依存関係を設定する方法

3
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?