背景
Airflowでタスクのarray同士の依存関係を定義したい場合がある。
例えば、 [op1, op2, op3] が終了したら、 [op4, op5, op6] を実行したいなど。
直感的に
[op1, op2, op3] >> [op4, op5, op6]
などと依存関係をDAGで定義しようとすると
unsupported operand type(s) for >>: 'list' and 'list'
というエラーになる。list同士は >> で依存関係は定義できない模様。
Airflowのドキュメントを眺めてみると、cross_downstream
という関数が定義されている。
cross_downstream([op1, op2, op3], [op4, op5, op6])
と定義すると
[op1, op2, op3] >> op4
[op1, op2, op3] >> op5
[op1, op2, op3] >> op6
となる。
これは便利なのだが、3つ以上のarrayの依存関係を持たせようとするとややこしくなる。例えば以下のようにしたい場合。
[op1, op2, op3] >> [op4, op5, op6] >> [op7, op8, op9] >> ....
cross_downstream使って定義するのもしんどい。
3つ以上のarrayタスク郡の依存性の定義方法
DummyOperatorを使った方法が良さそう。
直感的に定義できるし、簡単。
他にいい方法があれば教えて欲しい。
sample.dag
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime
default_args = {"owner": "airflow", "start_date": datetime(2020, 12, 7, 0, 0, 0)}
dag = DAG("array_tasks_test", default_args=default_args, schedule_interval=None)
op1 = BashOperator(task_id='op1', bash_command='echo "op1"', dag=dag)
op2 = BashOperator(task_id='op2', bash_command='echo "op2"', dag=dag)
#...略
op9 = BashOperator(task_id='op9', bash_command='echo "op9"', dag=dag)
dummy1 = DummyOperator(task_id="dummy1", dag=dag)
dummy2 = DummyOperator(task_id="dummy2", dag=dag)
dummy3 = DummyOperator(task_id="dummy3", dag=dag)
[op1, op2, op3] >> dummy1 >> [op4, op5, op6] >> dummy2 >> [op7, op8, op9]
Tree Viewはやや複雑になる。