LoginSignup
2
2

More than 1 year has passed since last update.

Airflow資料抄訳(5):制御フロー

Last updated at Posted at 2022-07-11

恥ずかしながら最近になって知ったワークフローエンジン Apache Airflow。日本語の紹介記事もちらほら出てきていますが、公式ドキュメントをちょっとずつ抄訳しながら読んでいこうと思います。

5回目の今回は制御フロー(Control Flow)。
バージョン2.3.3時点のものです。


制御フロー

デフォルトでは、あるタスクはそれが依存するタスクがすべて成功したときのみ実行されます。この挙動を変えるための方法はいくつか存在します:

  • 分岐(Branching)、任意の条件に基づき実行するタスクを選択します。
  • 最新日時でのみ実行(Latest Only)、分岐の特殊形態で、タスクを含むDAGが現在時刻で実行中の場合のみ当該タスクを実行します。
  • 過去に依存(Depends On Past)、タスクの実行を当該タスクの前回の実行記録に依拠して決定します。
  • トリガールール(Trigger Rules)、DAG配下にある当該タスクを実行する条件を設定できます。

分岐(Branching)

分岐を使用することで、あるタスクのすべての下流タスクを実行する代わりに、1つ以上のパスを選択的に実行することができます。

BranchPythonOperator はPythonOperatorに似ていますが、python_callabletask_id(もしくはtask_ids)を返す点で異なります。返されたIDのタスクのパスは実行されますが、それ以外のパスはすべてスキップされます。Noneを返すことですべての下流タスクをスキップすることも出来ます。

Python関数から返されたtask_idBranchPythonOperatorタスクの直接の下流タスクのIDである必要があります。

注意
あるタスクがBranchingOpratorタスクの下流タスクであると同時に同じBranchingOperatorタスクに後続する他の選択されたタスクの下流タスクでもあるという場合、当該タスクはスキップされません:
image.png
図中には枝分かれしたパスのタスクとして、branch_ajoin、そしてbranch_bがあります。joinbranch_aの下流タスクなので、Python関数が返したタスクIDにjoinが含まれていなかったとしても実行されます。

BranchPythonOperatorはXComsとともに使用することで上流タスク群の情報に基づいて動的に実行タスクを決めることができます。例えばこうです:

def branch_func(ti):
    xcom_value = int(ti.xcom_pull(task_ids="start_task"))
    if xcom_value >= 5:
        return "continue_task"
    elif xcom_value >= 3:
        return "stop_task"
    else:
        return None


start_op = BashOperator(
    task_id="start_task",
    bash_command="echo 5",
    xcom_push=True,
    dag=dag,
)

branch_op = BranchPythonOperator(
    task_id="branch_task",
    python_callable=branch_func,
    dag=dag,
)

continue_op = EmptyOperator(task_id="continue_task", dag=dag)
stop_op = EmptyOperator(task_id="stop_task", dag=dag)

start_op >> branch_op >> [continue_op, stop_op]

枝分かれ機能を持つ独自のオペレーターを実装するためにBaseBranchOperatorを継承したクラスを作成することができます。このクラスの挙動はBranchPythonOperatorに似ていますが、choose_branchメソッドの実装を提供する必要があります。

BranchPythonOperatorのコールバック関数と同じく、当該メソッドも下流タスクのIDもしくはそのリストを返します。IDが指し示すタスクは実行され、それ以外はスキップされます。Noneを返すことですべての下流タスクをスキップすることも出来ます:

class MyBranchOperator(BaseBranchOperator):
    def choose_branch(self, context):
        """
        月初は特別な枝線も実行する
        """
        if context['data_interval_start'].day == 1:
            return ['daily_task_id', 'monthly_task_id']
        elif context['data_interval_start'].day == 2:
            return 'daily_task_id'
        else:
            return None

最新日時でのみ実行(Latest Only)

DAGは現在日付と異なる日付で実行されることがあります。例えば、とあるDAGを前月分の一部データを補完するために実行するような場合です。

このような場合でもDAGの一部もしくは全部を過去日付の下で実行したくないということがあります。このようなときにはLatestOnlyOperatorを使用します。

この特殊なオペレーターはDAGが過去日付で実行されている場合(現在の実時刻がexecution_time と次回実行予定のexecution_timeの間にあり、かつ、外部からトリガーされた実行でない場合)に下流のすべてのタスクをスキップします。

こちらに例を示します:

airflow/example_dags/example_latest_only_with_trigger.py
import datetime

import pendulum

from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.latest_only import LatestOnlyOperator
from airflow.utils.trigger_rule import TriggerRule

with DAG(
    dag_id='latest_only_with_trigger',
    schedule_interval=datetime.timedelta(hours=4),
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=['example3'],
) as dag:
    latest_only = LatestOnlyOperator(task_id='latest_only')
    task1 = EmptyOperator(task_id='task1')
    task2 = EmptyOperator(task_id='task2')
    task3 = EmptyOperator(task_id='task3')
    task4 = EmptyOperator(task_id='task4', trigger_rule=TriggerRule.ALL_DONE)

    latest_only >> task1 >> [task3, task4]
    task2 >> [task3, task4]

このようなDAG宣言のもとでは:

  • task1latest_only の直接の下流タスクであり、最新時刻以外での実行ではスキップされます。
  • task2latest_only から完全に独立しており、いつでも実行されます。
  • task3task1task2 の下流タスクであり、トリガールールのデフォルトがall_successなので task1に連座して実行するしないが決まります。
  • task3 もまたtask1task2 の下流タスクですが、スキップされません。というのも trigger_ruleall_doneが指定されているからです。

image.png

過去に依存する(Depends On Past)

あるタスクについて前回のDAG実行時に成功していた時のみ当該タスクを実行したいことがあります。このようなときは、当該タスクのdepends_on_past 引数にTrueを指定します。

DAGを初めて実行するとき、とりわけ初めての自動実行のとき、前回の実行記録がない状態でも当該タスクは実行されます。

トリガールール(Trigger Rules)

デフォルトでは、あるタスクの上流のタスクがすべて成功しないと、当該タスクは実行されません。
しかしこれはデフォルトの挙動に過ぎず、タスクに対してtrigger_rule 引数を使用することでこれを制御できます。この引数に指定できる値は:

  • all_success (デフォルト):すべての上流タスクが成功したとき
  • all_failed:すべての上流タスクがfailed状態もしくはupstream_failed状態のとき
  • all_done:すべての上流タスクが完了したとき
  • all_skipped:すべての上流タスクがskipped 状態のとき
  • one_failed:少なくとも1つの上流タスクが失敗したとき(すべての上流タスクが完了するのを待たない)
  • one_success:少なくとも1つの上流タスクが成功したとき(すべての上流タスクが完了するのを待たない)
  • none_failed:上流タスクに failed状態やupstream_failed状態のものが含まれないとき──これはつまりすべての上流タスクが成功したかスキップされたとき
  • none_failed_min_one_success:上流タスクにfailed 状態やupstream_failed状態のものが含まれず、しかも少なくとも1つの上流タスクが成功したとき
  • none_skippedskipped状態のものが含まれないとき──これはつまりすべての上流タスクがsuccessfailedもしくはupstream_failedのいずれの状態でもないとき
  • always:あらゆる依存性なし、このタスクはいつでも実行される

トリガールールは「過去に依存する」機能と組み合わせることが出来ます。

注意
トリガールールとスキップされたタスクの間の関係性に注意してください。分岐の下流でall_successall_failedを使おうと思うことはほぼないでしょう。
トリガールールall_successall_failedが指定されたタスクにはスキップがカスケードします。結果それらは実行されません。次に示すDAGについて考えてみてください:

dags/branch_without_trigger.py
import pendulum

from airflow.models import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import BranchPythonOperator

dag = DAG(
    dag_id="branch_without_trigger",
    schedule_interval="@once",
    start_date=pendulum.datetime(2019, 2, 28, tz="UTC"),
)

run_this_first = EmptyOperator(task_id="run_this_first", dag=dag)
branching = BranchPythonOperator(
    task_id="branching", dag=dag, python_callable=lambda: "branch_a"
)

branch_a = EmptyOperator(task_id="branch_a", dag=dag)
follow_branch_a = EmptyOperator(task_id="follow_branch_a", dag=dag)

branch_false = EmptyOperator(task_id="branch_false", dag=dag)

join = EmptyOperator(task_id="join", dag=dag)

run_this_first >> branching
branching >> branch_a >> follow_branch_a >> join
branching >> branch_false >> join

joinfollow_branch_abranch_false双方の下流にあります。joinタスクにはデフォルトのトリガールールとしてall_successが設定されており、分岐で生じたスキップはall_successが指定されたタスクに対してカスケードするので、このタスクはスキップされます。

image.png

joinタスクのトリガールールにnone_failed_min_one_successを設定することで、意図した動作をするように改めることが出来ます:

image.png


「スキップはデフォルトで下流にカスケードする」というのは直感的なようで要注意なポイントだなと感じました。
ワークフローを構成するとき、ある系列の一部だけ条件次第で実行しないタスクが含まれるというのは、よくあるケースのように思えるためです。
理屈の上ではカスケードは自然なことですが、トリガールールの概念とともにきちんと理解しておく必要があります。

2
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
2