0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Airflow資料抄訳(4):サブDAG(SubDAGs)

Last updated at Posted at 2022-07-08

恥ずかしながら最近になって知ったワークフローエンジン Apache Airflow。日本語の紹介記事もちらほら出てきていますが、公式ドキュメントをちょっとずつ抄訳しながら読んでいこうと思います。

4回目の今回はサブDAG(SubDAGs)。
バージョン2.3.2時点のものです。


サブDAG(SubDAGs)

時折、複数のDAGに都度都度まったく同じタスクのセットを追加していることに気がついたり、多数のタスクを1つの論理的なグループにまとめたいと感じたりすることがあるでしょう。サブDAG(SubDAGs)はこのような時のために提供されています。
例えば、こちらのDAGは多数の並列タスクからなる2つのセクションを持っています:

image.png

これらの並列タスクを1つのサブDAGにまとめることができます。結果的にDAGは次のようになるでしょう:

image.png

サブDAGオペレーターの.pyファイルはDAGオブジェクトを返すファクトリーメソッドを含んでいる必要がある点に注意してください。これがないとAirflowはサブDAGをそれ単体で実行可能なDAGにように取り扱ってしまいます。Airflowは.pyファイルのトップレベルにDAGオブジェクトを発見すると、単体で実行可能な通常のDAGとしてロードします。

airflow/example_dags/subdags/subdag.py
import pendulum

from airflow import DAG
from airflow.operators.empty import EmptyOperator


def subdag(parent_dag_name, child_dag_name, args):
    """
    サブDAGを生成する

    :param str parent_dag_name: 親DAGのID
    :param str child_dag_name: 子DAGのID
    :param dict args: サブDAGに提供されたデフォルトの引数(arguments)
    :return: サブDAG
    :rtype: airflow.models.DAG
    """
    dag_subdag = DAG(
        dag_id=f'{parent_dag_name}.{child_dag_name}',
        default_args=args,
        start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
        catchup=False,
        schedule_interval="@daily",
    )

    for i in range(5):
        EmptyOperator(
            task_id=f'{child_dag_name}-task-{i + 1}',
            default_args=args,
            dag=dag_subdag,
        )

    return dag_subdag

こうして定義したサブDAGはメインのDAGから参照できます:

airflow/example_dags/example_subdag_operator.py
import datetime

from airflow import DAG
from airflow.example_dags.subdags.subdag import subdag
from airflow.operators.empty import EmptyOperator
from airflow.operators.subdag import SubDagOperator

DAG_NAME = 'example_subdag_operator'

with DAG(
    dag_id=DAG_NAME,
    default_args={"retries": 2},
    start_date=datetime.datetime(2022, 1, 1),
    schedule_interval="@once",
    tags=['example'],
) as dag:

    start = EmptyOperator(
        task_id='start',
    )

    section_1 = SubDagOperator(
        task_id='section-1',
        subdag=subdag(DAG_NAME, 'section-1', dag.default_args),
    )

    some_other_task = EmptyOperator(
        task_id='some-other-task',
    )

    section_2 = SubDagOperator(
        task_id='section-2',
        subdag=subdag(DAG_NAME, 'section-2', dag.default_args),
    )

    end = EmptyOperator(
        task_id='end',
    )

    start >> section_1 >> some_other_task >> section_2 >> end

メインのDAGのグラフ表示でSubDagOperatorにズームインすると、サブDAGに含まれるタスクを確認することができます:
image.png

サブDAGを使用するときに役立つその他の情報:

  • 慣例により、サブDAGのdag_idの先頭には親DAGの名前とドットをつけるべきです(parent.child)。
  • メインのDAGとサブDAGとの間で引数(arguments)を共有すべきです(先程のサンプルコードで示したように)。
  • サブDAGはスケジュールを持ち有効化されていなくてはなりません。サブDAGのスケジュールが Noneもしくは@onceの場合、サブDAGは何もせず先に進んでしまいます。
  • SubDagOperator をクリアすると、その内部のタスクの状態もクリアされます。
  • SubDagOperatorを成功とマークしても、その内部のタスクの状態には反映されません。
  • サブDAGの内部のタスクで depends_on_pastを利用するのはやめましょう。混乱を招きます。
  • サブDAGを実行するエグゼキューターを指定できます。サブDAGを一連の処理の途中で、並列度数を1に制限しつつ実行する場合、SequentialExecutorの使用が一般的です。LocalExecutorを使用すると、ワーカーがキャパシティ超過となり、単一スロットで複数タスクが実行される問題が生じる可能性があります。

airflow/example_dagsで実際の動きを確認できます。

注意
SubDagOperatorは並列度数の制限に従いません。このためあなたが設定した制限を超えたリソース消費を引き起こす可能性があります。


便利ではあるもののいろいろ使い方に注意が必要そうですね。。
そして本題からは逸れますが、Pythonのような動的型付け言語を土台とするAirflowが「DAGを返すファクトリーメソッド」を検知する仕組みはどのようになっているのでしょうか。。。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?