はじめに
Airflowでは、いくつかの方法でタスクの分岐をさせることが可能です。
何かしたの値を受けて、次のいずれかの処理のみを実行させたいときなどに便利ですよね!
今回はBranchPythonOperatorを使用しようしたタスク分岐の方法と、分岐したタスクを再度結合し、その後の処理を行う方法についてまとめていきます。
実行環境
- AWS MWAA環境 (Airflowバージョン2.2.2)
やってみる
今回は以下の手順で進めていきます。
- BranchPythonOperatorの基本的な使い方
- タスクの結合
BranchPythonOperatorの基本的な使い方
まずはBranchPythonOperatorの基本的な使い方です。
以下のようにpython_callableに分岐させるための関数を渡し、関数側で次のタスクのtask_idを返してやることで簡単に分岐させることができます。
以下の例では、branchタスクからnext_true_taskとnext_false_taskに分岐させています。branch_functionの結果が常にnext_true_taskを返すので、next_true_taskのみが実行されるはずです。
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python_operator import BranchPythonOperator
def branch_function(**kwargs):
if True:
return "next_true_task"
else:
return "next_false_task"
with DAG(
"branch_test",
) as dag:
start = DummyOperator(
task_id="start",
dag=dag,
)
branch = BranchPythonOperator(
task_id="branch",
python_callable=branch_function,
dag=dag,
)
t1 = DummyOperator(task_id="next_true_task")
t2 = DummyOperator(task_id="next_false_task")
start >> branch >> [t1, t2]
AirflowUIのGraph Viewを見ると以下のようにbranchタスクからnext_true_taskとnext_false_taskに分岐しているのが分かります。
また実際に実行すると、以下のようにnext_false_taskがskipped(ピンク色)のステータスになりました。問題なさそうです。
タスクの結合
タスクの分岐はBranchPythonOperatorによって簡単に行えましたが、その後再度タスクを合流させ処理を続けたいことがあります。
Graph Viewで見ると以下のような流れですね。新たにjoin、finishタスクを置きました。
join、finishタスクはどちらもDummyOperatorによるタスクで、中身は空です。
#
#
#
join = DummyOperator(
task_id="join"
)
finish = DummyOperator(
task_id="finish"
)
start >> branch >> [t1, t2] >> join >> finish
これで問題なさそうなのですが、このまま実行すると最後まで実行されず、以下のようにjoin、finishタスクまでスキップされてしまいました。
実は各オペレーターの基底クラスであるBaseOperatorにはtrigger_ruleという変数があり、以下のような変数になっています。
"""
:param trigger_rule: defines the rule by which dependencies are applied
for the task to get triggered. Options are:
``{ all_success | all_failed | all_done | one_success |
one_failed | none_failed | none_failed_min_one_success | none_skipped | always}``
default is ``all_success``. Options can be set as string or
using the constants defined in the static class
``airflow.utils.TriggerRule``
:type trigger_rule: str
"""
上流のタスクに自分自身の実行がどのように依存するかを設定するためのもので、全て成功した場合のみのall_successや全て失敗した場合のみのall_failedなど、いくつかの設定値に変更することができます。
そして、これのデフォルト値はall_successになっており、上流の全てのタスクが成功しなければなりません。
trigger_ruleの各詳細についてはドキュメントの以下にまとまっています。
これまでに実装したDAGでは、joinタスクの上流のnext_true_taskとnext_false_taskはどちらかのみしか実行されず、両方が成功することはありません。そのために、join以降のタスクも実行されずスキップされてしまったのでした。
そこで、joinタスクを以下のように変更します
#
#
join = DummyOperator(
task_id="join",
trigger_rule="none_failed_min_one_success",
)
#
#
none_failed_min_one_successは上流のタスクがひとつも失敗せず、かつ少なくとも一つは成功した場合にトリガーされます。
それでは、DAGを実行してみましょう。
以下のように、join、finishまでタスクが実行されることが確認できました。
まとめ
今回は、Airflowにおけるタスクの分岐と結合のやり方をまとめました。
方法自体は単純ですが、ドキュメントにきちんと目を通していかないとハマってしまうポイントかもしれません。
しかし、タスクの分岐、結合など、フローを柔軟に組めるようになると、できることが増えて楽しいですね!
それでは今回はここまで!




