はじめに
Airflowでは、いくつかの方法でタスクの分岐をさせることが可能です。
何かしたの値を受けて、次のいずれかの処理のみを実行させたいときなどに便利ですよね!
今回はBranchPythonOperator
を使用しようしたタスク分岐の方法と、分岐したタスクを再度結合し、その後の処理を行う方法についてまとめていきます。
実行環境
- AWS MWAA環境 (Airflowバージョン2.2.2)
やってみる
今回は以下の手順で進めていきます。
- BranchPythonOperatorの基本的な使い方
- タスクの結合
BranchPythonOperatorの基本的な使い方
まずはBranchPythonOperator
の基本的な使い方です。
以下のようにpython_callable
に分岐させるための関数を渡し、関数側で次のタスクのtask_idを返してやることで簡単に分岐させることができます。
以下の例では、branch
タスクからnext_true_task
とnext_false_task
に分岐させています。branch_function
の結果が常にnext_true_task
を返すので、next_true_task
のみが実行されるはずです。
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python_operator import BranchPythonOperator
def branch_function(**kwargs):
if True:
return "next_true_task"
else:
return "next_false_task"
with DAG(
"branch_test",
) as dag:
start = DummyOperator(
task_id="start",
dag=dag,
)
branch = BranchPythonOperator(
task_id="branch",
python_callable=branch_function,
dag=dag,
)
t1 = DummyOperator(task_id="next_true_task")
t2 = DummyOperator(task_id="next_false_task")
start >> branch >> [t1, t2]
AirflowUIのGraph Viewを見ると以下のようにbranch
タスクからnext_true_task
とnext_false_task
に分岐しているのが分かります。
また実際に実行すると、以下のようにnext_false_task
がskipped
(ピンク色)のステータスになりました。問題なさそうです。
タスクの結合
タスクの分岐はBranchPythonOperator
によって簡単に行えましたが、その後再度タスクを合流させ処理を続けたいことがあります。
Graph Viewで見ると以下のような流れですね。新たにjoin
、finish
タスクを置きました。
join
、finish
タスクはどちらもDummyOperator
によるタスクで、中身は空です。
#
#
#
join = DummyOperator(
task_id="join"
)
finish = DummyOperator(
task_id="finish"
)
start >> branch >> [t1, t2] >> join >> finish
これで問題なさそうなのですが、このまま実行すると最後まで実行されず、以下のようにjoin
、finish
タスクまでスキップされてしまいました。
実は各オペレーターの基底クラスであるBaseOperator
にはtrigger_rule
という変数があり、以下のような変数になっています。
"""
:param trigger_rule: defines the rule by which dependencies are applied
for the task to get triggered. Options are:
``{ all_success | all_failed | all_done | one_success |
one_failed | none_failed | none_failed_min_one_success | none_skipped | always}``
default is ``all_success``. Options can be set as string or
using the constants defined in the static class
``airflow.utils.TriggerRule``
:type trigger_rule: str
"""
上流のタスクに自分自身の実行がどのように依存するかを設定するためのもので、全て成功した場合のみのall_success
や全て失敗した場合のみのall_failed
など、いくつかの設定値に変更することができます。
そして、これのデフォルト値はall_success
になっており、上流の全てのタスクが成功しなければなりません。
trigger_ruleの各詳細についてはドキュメントの以下にまとまっています。
これまでに実装したDAGでは、join
タスクの上流のnext_true_task
とnext_false_task
はどちらかのみしか実行されず、両方が成功することはありません。そのために、join
以降のタスクも実行されずスキップされてしまったのでした。
そこで、join
タスクを以下のように変更します
#
#
join = DummyOperator(
task_id="join",
trigger_rule="none_failed_min_one_success",
)
#
#
none_failed_min_one_success
は上流のタスクがひとつも失敗せず、かつ少なくとも一つは成功した場合にトリガーされます。
それでは、DAGを実行してみましょう。
以下のように、join
、finish
までタスクが実行されることが確認できました。
まとめ
今回は、Airflowにおけるタスクの分岐と結合のやり方をまとめました。
方法自体は単純ですが、ドキュメントにきちんと目を通していかないとハマってしまうポイントかもしれません。
しかし、タスクの分岐、結合など、フローを柔軟に組めるようになると、できることが増えて楽しいですね!
それでは今回はここまで!