はじめに
こんにちは。ジールの@________________-_です。
airflow2系になってからDAG間の依存関係が、依存関係を定義しているtaskとともに表示されるようになりました。
分かりやすくなってめちゃくちゃいいと思います!
そういうこともあり、DAG間の依存関係を作成できるoperatorを纏めてみました。
環境
airflow breeze:v2.3.0.dev0
デフォルト設定で利用
依存関係を定義するオペレーター達
ExternalTaskSensor
外部のDAG(task)の完了ステータスをポーリングしてくれるsensorです。
sensor対象のDAGが(設定したステータスで)完了したら成功となり後続タスクを実行できます。
複数DAG(task)を親としたDAGを実行したい場合、ExternalTaskSensorを利用すると良いです。
from airflow.sensors.external_task import ExternalTaskSensor
ExternalTaskSensor(
task_id='external_task_sensor_task',
external_dag_id='external_task_sensor_parent',
external_task_id='external_task_sensor_parent_task',
allowed_states=['success'],
failed_states=['failed', 'skipped'],
check_existence=True,
poke_interval=10,
#execution_date_fn=lambda dt: dt + datetime.timedelta(minutes=-1),
execution_delta=datetime.timedelta(minutes=1),
mode='reschedule'
- external_dag_id:sensor対象のdag_id
- external_task_id:sensor対象のtask_id。指定しない場合は対象DAGをsensorします。
- allowed_states:Sensorの成功条件となる対象taskのステータス
- failed_states:Sensorの失敗条件となる対象taskのステータス(ポーリングを繰り返す条件ではなくSensorが失敗となる条件)
- check_existence:Sensor対象のtask有無のチェック。Trueの場合、対象のtaskが存在しないとエラーになる。
- execution_date_fn:sensor対象DAGとsensorのlogical_dateの差を計算する関数。例えば、sensorがsensor対象DAGの1分後に実行される場合は、
execution_date_fn=lambda dt: dt + datetime.timedelta(minutes=-1)
となる。timedelta(minutes=-1)
と指定しているように、sensor対象DAGがどの位先に(後に)実行されるか計算される関数を指定する。 - execution_delta:sensor対象DAGとsensorのlogical_dateの差。例えば、sensorがsensor対象のDAGの1分後に実行される場合は、
datetime.timedelta(minutes=1)
となる。timedelta(minutes=1)
と指定しているように、sensorがどの位先に(後に)実行されるかを指定する。
※execution_date_fnとexecution_deltaは両方指定するとエラーになるので注意。また、指定する時間が逆(sensor目線なのかsensor対象DAG目線なのか)になるので注意。
sensor対象DAGとsensorのlogical_dateが同じ場合は気にしなくて良いのですが、違っている場合はexecution_date_fn
またはexecution_delta
を指定しないとsensorが正しく動かないので注意が必要です。
例えば、sensor対象DAGの10分後にsensorが実行される場合、execution_date_fn
またはexecution_delta
を指定して10分差があることを明示する必要があります。
上記しましたが、利用するオプションによってtimedelta(minutes=-10)
かtimedelta(minutes=10)
の様に指定する時間が逆になるので注意してください。
#DAG間でlogical_dateが10分差の場合(sensorの方が10分後に実行)
execution_date_fn=lambda dt: dt + datetime.timedelta(minutes=-10),
#または
execution_delta=datetime.timedelta(minutes=10),
ExternalTaskMarker
外部DAGのタスクも併せてclear(再実行)出来るようにするためのoperatorです。
例えば、A_dagのExternalTaskMarker_taskが依存関係を設定しているB_dagのB_taskが存在していたとします。Aが親、Bが子の依存関係となります。
その際に、A_taskからdownstreamかつrecursive
を選択してclear(再実行)するとB_dagのB_task(とそのdownstreamタスク)も併せてclearされるようになります。
このようにclearを実行した場合、依存関係のある外部DAG(task)をdownstreamとしてまとめて処理することが出来ます。
from airflow.sensors.external_task import ExternalTaskMarker
ExternalTaskMarker(
task_id="external_task_marker_task",
external_dag_id="external_task_marker_child",
external_task_id="external_task_marker_child_task",
execution_date="{{ dag_run.logical_date + macros.timedelta(minutes=1) }}"
)
- external_dag_id:clearを合わせて実行したい対象DAG_id
- external_task_id:clearを合わせて実行したい対象task_id
- execution_date:clearを合わせて実行したい対象DAGのlogical_date。指定しない場合はExternalTaskMarkerと同じlogical_dateのtaskが対象となる。
external_task_marker_taskからdownstreamとrecursive
を指定してclear実行してみると、外部DAG(task)のexternal_task_marker_child_taskも併せてclear対象となっています。
ただしExternalTaskMarkerの依存関係では、DAG依存関係の見える化対象にはならないようなので注意が必要です。
TriggerDagRunOperator
外部のDAGをキックしてくれるoperatorです。
親DAGの後続処理として複数(または単体)の子DAGを即時実行したい場合、TriggerDagRunOperatorを利用すると良いと思います。
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
TriggerDagRunOperator(
task_id='trigger_dag_run_task',
trigger_dag_id="trigger_dag_run_child",
conf={"message": "testestest"},
)
- trigger_dag_id:トリガー対象のDAG_id
- conf:トリガー対象のDAGに渡すデータ。トリガー対象のDAG内部で利用可能。
おわりに
何らかの理由でDAGに依存関係を持たせたい場合、airflow1系ですと依存関係を見える化できませんでしたが、2系になって見える化出来るようになったのはとてもありがたいです。
ExternalTaskSensorは、子DAG内に複数タスクとして持たせることで複数の親DAGとの依存関係(n対1)を定義できます。
また、親DAGの完了時間と子DAGの実行時間に差を持たせたい場合や、子DAGの実行を日跨ぎさせたくない等の実行タイミングを管理したい場合もExternalTaskSensorの利用が良いと思います。
TriggerDagRunOperatorは、親DAG内に複数タスクとして持たせることで複数の子DAGとの依存関係(1対n)を定義できます。
親DAGの完了時間に合わせて必ず子DAGを実行したい場合等はTriggerDagRunOperatorが良いかもしれません。
※1対1の依存関係の場合はどちらのoperatorでも実装可能。
DAG同士に依存関係を持たせたい場合は、要件に応じて上記operatorを利用してみてください。
上記では紹介していないoperatorのオプションもありますので、詳細は以下参照をご確認下さい。
参照
株式会社ジールでは、「ITリテラシーがない」「初期費用がかけられない」「親切・丁寧な支援がほしい」「ノーコード・ローコードがよい」「運用・保守の手間をかけられない」などのお客様の声を受けて、オールインワン型データ活用プラットフォーム「ZEUSCloud」を月額利用料にてご提供しております。
ご興味がある方は是非下記のリンクをご覧ください: