8
6

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

【Airflow】DAG間の依存関係を作るには。

Last updated at Posted at 2022-04-05

はじめに

こんにちは。ジールの@________________-_です。
airflow2系になってからDAG間の依存関係が、依存関係を定義しているtaskとともに表示されるようになりました。
image.png
分かりやすくなってめちゃくちゃいいと思います!
そういうこともあり、DAG間の依存関係を作成できるoperatorを纏めてみました。

環境

airflow breeze:v2.3.0.dev0
デフォルト設定で利用

依存関係を定義するオペレーター達

ExternalTaskSensor

外部のDAG(task)の完了ステータスをポーリングしてくれるsensorです。
sensor対象のDAGが(設定したステータスで)完了したら成功となり後続タスクを実行できます。
複数DAG(task)を親としたDAGを実行したい場合、ExternalTaskSensorを利用すると良いです。

from airflow.sensors.external_task import ExternalTaskSensor

ExternalTaskSensor(
    task_id='external_task_sensor_task',
    external_dag_id='external_task_sensor_parent',
    external_task_id='external_task_sensor_parent_task',
    allowed_states=['success'],
    failed_states=['failed', 'skipped'],
    check_existence=True,
    poke_interval=10,
    #execution_date_fn=lambda dt: dt + datetime.timedelta(minutes=-1),
    execution_delta=datetime.timedelta(minutes=1),
    mode='reschedule'
  • external_dag_id:sensor対象のdag_id
  • external_task_id:sensor対象のtask_id。指定しない場合は対象DAGをsensorします。
  • allowed_states:Sensorの成功条件となる対象taskのステータス
  • failed_states:Sensorの失敗条件となる対象taskのステータス(ポーリングを繰り返す条件ではなくSensorが失敗となる条件)
  • check_existence:Sensor対象のtask有無のチェック。Trueの場合、対象のtaskが存在しないとエラーになる。
  • execution_date_fn:sensor対象DAGとsensorのlogical_dateの差を計算する関数。例えば、sensorがsensor対象DAGの1分後に実行される場合は、execution_date_fn=lambda dt: dt + datetime.timedelta(minutes=-1) となる。timedelta(minutes=-1)と指定しているように、sensor対象DAGがどの位先に(後に)実行されるか計算される関数を指定する。
  • execution_delta:sensor対象DAGとsensorのlogical_dateの差。例えば、sensorがsensor対象のDAGの1分後に実行される場合は、datetime.timedelta(minutes=1)となる。timedelta(minutes=1)と指定しているように、sensorがどの位先に(後に)実行されるかを指定する。
    ※execution_date_fnとexecution_deltaは両方指定するとエラーになるので注意。また、指定する時間が逆(sensor目線なのかsensor対象DAG目線なのか)になるので注意。

sensor対象DAGとsensorのlogical_dateが同じ場合は気にしなくて良いのですが、違っている場合はexecution_date_fnまたはexecution_deltaを指定しないとsensorが正しく動かないので注意が必要です。
例えば、sensor対象DAGの10分後にsensorが実行される場合、execution_date_fnまたはexecution_deltaを指定して10分差があることを明示する必要があります。
上記しましたが、利用するオプションによってtimedelta(minutes=-10)timedelta(minutes=10)の様に指定する時間が逆になるので注意してください。

#DAG間でlogical_dateが10分差の場合(sensorの方が10分後に実行)
execution_date_fn=lambda dt: dt + datetime.timedelta(minutes=-10),
#または
execution_delta=datetime.timedelta(minutes=10),

ExternalTaskMarker

外部DAGのタスクも併せてclear(再実行)出来るようにするためのoperatorです。
例えば、A_dagのExternalTaskMarker_taskが依存関係を設定しているB_dagのB_taskが存在していたとします。Aが親、Bが子の依存関係となります。
その際に、A_taskからdownstreamかつrecursiveを選択してclear(再実行)するとB_dagのB_task(とそのdownstreamタスク)も併せてclearされるようになります。
このようにclearを実行した場合、依存関係のある外部DAG(task)をdownstreamとしてまとめて処理することが出来ます。

from airflow.sensors.external_task import ExternalTaskMarker

ExternalTaskMarker(
    task_id="external_task_marker_task",
    external_dag_id="external_task_marker_child",
    external_task_id="external_task_marker_child_task",
    execution_date="{{ dag_run.logical_date  + macros.timedelta(minutes=1) }}"
    )
  • external_dag_id:clearを合わせて実行したい対象DAG_id
  • external_task_id:clearを合わせて実行したい対象task_id
  • execution_date:clearを合わせて実行したい対象DAGのlogical_date。指定しない場合はExternalTaskMarkerと同じlogical_dateのtaskが対象となる。

external_task_marker_taskからdownstreamとrecursiveを指定してclear実行してみると、外部DAG(task)のexternal_task_marker_child_taskも併せてclear対象となっています。
image.png

ただしExternalTaskMarkerの依存関係では、DAG依存関係の見える化対象にはならないようなので注意が必要です。

image.png

TriggerDagRunOperator

外部のDAGをキックしてくれるoperatorです。
親DAGの後続処理として複数(または単体)の子DAGを即時実行したい場合、TriggerDagRunOperatorを利用すると良いと思います。

from airflow.operators.trigger_dagrun import TriggerDagRunOperator

TriggerDagRunOperator(
    task_id='trigger_dag_run_task',
    trigger_dag_id="trigger_dag_run_child",
    conf={"message": "testestest"},
)
  • trigger_dag_id:トリガー対象のDAG_id
  • conf:トリガー対象のDAGに渡すデータ。トリガー対象のDAG内部で利用可能。

おわりに

何らかの理由でDAGに依存関係を持たせたい場合、airflow1系ですと依存関係を見える化できませんでしたが、2系になって見える化出来るようになったのはとてもありがたいです。

ExternalTaskSensorは、子DAG内に複数タスクとして持たせることで複数の親DAGとの依存関係(n対1)を定義できます。
また、親DAGの完了時間と子DAGの実行時間に差を持たせたい場合や、子DAGの実行を日跨ぎさせたくない等の実行タイミングを管理したい場合もExternalTaskSensorの利用が良いと思います。
TriggerDagRunOperatorは、親DAG内に複数タスクとして持たせることで複数の子DAGとの依存関係(1対n)を定義できます。
親DAGの完了時間に合わせて必ず子DAGを実行したい場合等はTriggerDagRunOperatorが良いかもしれません。
※1対1の依存関係の場合はどちらのoperatorでも実装可能。

DAG同士に依存関係を持たせたい場合は、要件に応じて上記operatorを利用してみてください。
上記では紹介していないoperatorのオプションもありますので、詳細は以下参照をご確認下さい。

参照


株式会社ジールでは、「ITリテラシーがない」「初期費用がかけられない」「親切・丁寧な支援がほしい」「ノーコード・ローコードがよい」「運用・保守の手間をかけられない」などのお客様の声を受けて、オールインワン型データ活用プラットフォーム「ZEUSCloud」を月額利用料にてご提供しております。
ご興味がある方は是非下記のリンクをご覧ください:

8
6
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
8
6

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?