恥ずかしながら最近になって知ったワークフローエンジン Apache Airflow。日本語の紹介記事もちらほら出てきていますが、公式ドキュメントをちょっとずつ抄訳しながら読んでいこうと思います。
4回目の今回はサブDAG(SubDAGs)。
バージョン2.3.2時点のものです。
サブDAG(SubDAGs)
時折、複数のDAGに都度都度まったく同じタスクのセットを追加していることに気がついたり、多数のタスクを1つの論理的なグループにまとめたいと感じたりすることがあるでしょう。サブDAG(SubDAGs)はこのような時のために提供されています。
例えば、こちらのDAGは多数の並列タスクからなる2つのセクションを持っています:
これらの並列タスクを1つのサブDAGにまとめることができます。結果的にDAGは次のようになるでしょう:
サブDAGオペレーターの.pyファイルはDAGオブジェクトを返すファクトリーメソッドを含んでいる必要がある点に注意してください。これがないとAirflowはサブDAGをそれ単体で実行可能なDAGにように取り扱ってしまいます。Airflowは.pyファイルのトップレベルにDAGオブジェクトを発見すると、単体で実行可能な通常のDAGとしてロードします。
import pendulum
from airflow import DAG
from airflow.operators.empty import EmptyOperator
def subdag(parent_dag_name, child_dag_name, args):
"""
サブDAGを生成する
:param str parent_dag_name: 親DAGのID
:param str child_dag_name: 子DAGのID
:param dict args: サブDAGに提供されたデフォルトの引数(arguments)
:return: サブDAG
:rtype: airflow.models.DAG
"""
dag_subdag = DAG(
dag_id=f'{parent_dag_name}.{child_dag_name}',
default_args=args,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
schedule_interval="@daily",
)
for i in range(5):
EmptyOperator(
task_id=f'{child_dag_name}-task-{i + 1}',
default_args=args,
dag=dag_subdag,
)
return dag_subdag
こうして定義したサブDAGはメインのDAGから参照できます:
import datetime
from airflow import DAG
from airflow.example_dags.subdags.subdag import subdag
from airflow.operators.empty import EmptyOperator
from airflow.operators.subdag import SubDagOperator
DAG_NAME = 'example_subdag_operator'
with DAG(
dag_id=DAG_NAME,
default_args={"retries": 2},
start_date=datetime.datetime(2022, 1, 1),
schedule_interval="@once",
tags=['example'],
) as dag:
start = EmptyOperator(
task_id='start',
)
section_1 = SubDagOperator(
task_id='section-1',
subdag=subdag(DAG_NAME, 'section-1', dag.default_args),
)
some_other_task = EmptyOperator(
task_id='some-other-task',
)
section_2 = SubDagOperator(
task_id='section-2',
subdag=subdag(DAG_NAME, 'section-2', dag.default_args),
)
end = EmptyOperator(
task_id='end',
)
start >> section_1 >> some_other_task >> section_2 >> end
メインのDAGのグラフ表示でSubDagOperator
にズームインすると、サブDAGに含まれるタスクを確認することができます:
サブDAGを使用するときに役立つその他の情報:
- 慣例により、サブDAGの
dag_id
の先頭には親DAGの名前とドットをつけるべきです(parent.child
)。 - メインのDAGとサブDAGとの間で引数(arguments)を共有すべきです(先程のサンプルコードで示したように)。
- サブDAGはスケジュールを持ち有効化されていなくてはなりません。サブDAGのスケジュールが
None
もしくは@once
の場合、サブDAGは何もせず先に進んでしまいます。 -
SubDagOperator
をクリアすると、その内部のタスクの状態もクリアされます。 -
SubDagOperator
を成功とマークしても、その内部のタスクの状態には反映されません。 - サブDAGの内部のタスクで
depends_on_past
を利用するのはやめましょう。混乱を招きます。 - サブDAGを実行するエグゼキューターを指定できます。サブDAGを一連の処理の途中で、並列度数を1に制限しつつ実行する場合、SequentialExecutorの使用が一般的です。LocalExecutorを使用すると、ワーカーがキャパシティ超過となり、単一スロットで複数タスクが実行される問題が生じる可能性があります。
airflow/example_dags
で実際の動きを確認できます。
注意
SubDagOperator
は並列度数の制限に従いません。このためあなたが設定した制限を超えたリソース消費を引き起こす可能性があります。
便利ではあるもののいろいろ使い方に注意が必要そうですね。。
そして本題からは逸れますが、Pythonのような動的型付け言語を土台とするAirflowが「DAGを返すファクトリーメソッド」を検知する仕組みはどのようになっているのでしょうか。。。