恥ずかしながら最近になって知ったワークフローエンジン Apache Airflow。日本語の紹介記事もちらほら出てきていますが、公式ドキュメントをちょっとずつ抄訳しながら読んでいこうと思います。
5回目の今回は制御フロー(Control Flow)。
バージョン2.3.3時点のものです。
制御フロー
デフォルトでは、あるタスクはそれが依存するタスクがすべて成功したときのみ実行されます。この挙動を変えるための方法はいくつか存在します:
- 分岐(Branching)、任意の条件に基づき実行するタスクを選択します。
- 最新日時でのみ実行(Latest Only)、分岐の特殊形態で、タスクを含むDAGが現在時刻で実行中の場合のみ当該タスクを実行します。
- 過去に依存(Depends On Past)、タスクの実行を当該タスクの前回の実行記録に依拠して決定します。
- トリガールール(Trigger Rules)、DAG配下にある当該タスクを実行する条件を設定できます。
分岐(Branching)
分岐を使用することで、あるタスクのすべての下流タスクを実行する代わりに、1つ以上のパスを選択的に実行することができます。
BranchPythonOperator
はPythonOperatorに似ていますが、python_callable
がtask_id
(もしくはtask_ids
)を返す点で異なります。返されたIDのタスクのパスは実行されますが、それ以外のパスはすべてスキップされます。None
を返すことですべての下流タスクをスキップすることも出来ます。
Python関数から返されたtask_id
はBranchPythonOperator
タスクの直接の下流タスクのIDである必要があります。
BranchPythonOperator
はXComsとともに使用することで上流タスク群の情報に基づいて動的に実行タスクを決めることができます。例えばこうです:
def branch_func(ti):
xcom_value = int(ti.xcom_pull(task_ids="start_task"))
if xcom_value >= 5:
return "continue_task"
elif xcom_value >= 3:
return "stop_task"
else:
return None
start_op = BashOperator(
task_id="start_task",
bash_command="echo 5",
xcom_push=True,
dag=dag,
)
branch_op = BranchPythonOperator(
task_id="branch_task",
python_callable=branch_func,
dag=dag,
)
continue_op = EmptyOperator(task_id="continue_task", dag=dag)
stop_op = EmptyOperator(task_id="stop_task", dag=dag)
start_op >> branch_op >> [continue_op, stop_op]
枝分かれ機能を持つ独自のオペレーターを実装するためにBaseBranchOperator
を継承したクラスを作成することができます。このクラスの挙動はBranchPythonOperator
に似ていますが、choose_branch
メソッドの実装を提供する必要があります。
BranchPythonOperator
のコールバック関数と同じく、当該メソッドも下流タスクのIDもしくはそのリストを返します。IDが指し示すタスクは実行され、それ以外はスキップされます。None
を返すことですべての下流タスクをスキップすることも出来ます:
class MyBranchOperator(BaseBranchOperator):
def choose_branch(self, context):
"""
月初は特別な枝線も実行する
"""
if context['data_interval_start'].day == 1:
return ['daily_task_id', 'monthly_task_id']
elif context['data_interval_start'].day == 2:
return 'daily_task_id'
else:
return None
最新日時でのみ実行(Latest Only)
DAGは現在日付と異なる日付で実行されることがあります。例えば、とあるDAGを前月分の一部データを補完するために実行するような場合です。
このような場合でもDAGの一部もしくは全部を過去日付の下で実行したくないということがあります。このようなときにはLatestOnlyOperator
を使用します。
この特殊なオペレーターはDAGが過去日付で実行されている場合(現在の実時刻がexecution_time
と次回実行予定のexecution_time
の間にあり、かつ、外部からトリガーされた実行でない場合)に下流のすべてのタスクをスキップします。
こちらに例を示します:
import datetime
import pendulum
from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.latest_only import LatestOnlyOperator
from airflow.utils.trigger_rule import TriggerRule
with DAG(
dag_id='latest_only_with_trigger',
schedule_interval=datetime.timedelta(hours=4),
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=['example3'],
) as dag:
latest_only = LatestOnlyOperator(task_id='latest_only')
task1 = EmptyOperator(task_id='task1')
task2 = EmptyOperator(task_id='task2')
task3 = EmptyOperator(task_id='task3')
task4 = EmptyOperator(task_id='task4', trigger_rule=TriggerRule.ALL_DONE)
latest_only >> task1 >> [task3, task4]
task2 >> [task3, task4]
このようなDAG宣言のもとでは:
-
task1
はlatest_only
の直接の下流タスクであり、最新時刻以外での実行ではスキップされます。 -
task2
はlatest_only
から完全に独立しており、いつでも実行されます。 -
task3
はtask1
とtask2
の下流タスクであり、トリガールールのデフォルトがall_success
なのでtask1
に連座して実行するしないが決まります。 -
task3
もまたtask1
とtask2
の下流タスクですが、スキップされません。というのもtrigger_rule
でall_done
が指定されているからです。
過去に依存する(Depends On Past)
あるタスクについて前回のDAG実行時に成功していた時のみ当該タスクを実行したいことがあります。このようなときは、当該タスクのdepends_on_past
引数にTrue
を指定します。
DAGを初めて実行するとき、とりわけ初めての自動実行のとき、前回の実行記録がない状態でも当該タスクは実行されます。
トリガールール(Trigger Rules)
デフォルトでは、あるタスクの上流のタスクがすべて成功しないと、当該タスクは実行されません。
しかしこれはデフォルトの挙動に過ぎず、タスクに対してtrigger_rule
引数を使用することでこれを制御できます。この引数に指定できる値は:
-
all_success
(デフォルト):すべての上流タスクが成功したとき -
all_failed
:すべての上流タスクがfailed
状態もしくはupstream_failed
状態のとき -
all_done
:すべての上流タスクが完了したとき -
all_skipped
:すべての上流タスクがskipped
状態のとき -
one_failed
:少なくとも1つの上流タスクが失敗したとき(すべての上流タスクが完了するのを待たない) -
one_success
:少なくとも1つの上流タスクが成功したとき(すべての上流タスクが完了するのを待たない) -
none_failed
:上流タスクにfailed
状態やupstream_failed
状態のものが含まれないとき──これはつまりすべての上流タスクが成功したかスキップされたとき -
none_failed_min_one_success
:上流タスクにfailed
状態やupstream_failed
状態のものが含まれず、しかも少なくとも1つの上流タスクが成功したとき -
none_skipped
:skipped
状態のものが含まれないとき──これはつまりすべての上流タスクがsuccess
、failed
もしくはupstream_failed
のいずれの状態でもないとき -
always
:あらゆる依存性なし、このタスクはいつでも実行される
トリガールールは「過去に依存する」機能と組み合わせることが出来ます。
注意
トリガールールとスキップされたタスクの間の関係性に注意してください。分岐の下流でall_success
やall_failed
を使おうと思うことはほぼないでしょう。
トリガールールall_success
やall_failed
が指定されたタスクにはスキップがカスケードします。結果それらは実行されません。次に示すDAGについて考えてみてください:
import pendulum
from airflow.models import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import BranchPythonOperator
dag = DAG(
dag_id="branch_without_trigger",
schedule_interval="@once",
start_date=pendulum.datetime(2019, 2, 28, tz="UTC"),
)
run_this_first = EmptyOperator(task_id="run_this_first", dag=dag)
branching = BranchPythonOperator(
task_id="branching", dag=dag, python_callable=lambda: "branch_a"
)
branch_a = EmptyOperator(task_id="branch_a", dag=dag)
follow_branch_a = EmptyOperator(task_id="follow_branch_a", dag=dag)
branch_false = EmptyOperator(task_id="branch_false", dag=dag)
join = EmptyOperator(task_id="join", dag=dag)
run_this_first >> branching
branching >> branch_a >> follow_branch_a >> join
branching >> branch_false >> join
join
はfollow_branch_a
とbranch_false
双方の下流にあります。join
タスクにはデフォルトのトリガールールとしてall_success
が設定されており、分岐で生じたスキップはall_success
が指定されたタスクに対してカスケードするので、このタスクはスキップされます。
join
タスクのトリガールールにnone_failed_min_one_success
を設定することで、意図した動作をするように改めることが出来ます:
「スキップはデフォルトで下流にカスケードする」というのは直感的なようで要注意なポイントだなと感じました。
ワークフローを構成するとき、ある系列の一部だけ条件次第で実行しないタスクが含まれるというのは、よくあるケースのように思えるためです。
理屈の上ではカスケードは自然なことですが、トリガールールの概念とともにきちんと理解しておく必要があります。