7
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Airflowの実行タスクをBranchPythonOperatorによって分岐させ、結合する方法

Posted at

はじめに

Airflowでは、いくつかの方法でタスクの分岐をさせることが可能です。
何かしたの値を受けて、次のいずれかの処理のみを実行させたいときなどに便利ですよね!
今回はBranchPythonOperatorを使用しようしたタスク分岐の方法と、分岐したタスクを再度結合し、その後の処理を行う方法についてまとめていきます。

実行環境

  • AWS MWAA環境 (Airflowバージョン2.2.2)

やってみる

今回は以下の手順で進めていきます。

  1. BranchPythonOperatorの基本的な使い方
  2. タスクの結合

BranchPythonOperatorの基本的な使い方

まずはBranchPythonOperatorの基本的な使い方です。

以下のようにpython_callableに分岐させるための関数を渡し、関数側で次のタスクのtask_idを返してやることで簡単に分岐させることができます。
以下の例では、branchタスクからnext_true_tasknext_false_taskに分岐させています。branch_functionの結果が常にnext_true_taskを返すので、next_true_taskのみが実行されるはずです。

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python_operator import BranchPythonOperator

def branch_function(**kwargs):
    if True:
        return "next_true_task"
    else:
        return "next_false_task"

with DAG(
    "branch_test",
) as dag:
    start = DummyOperator(
        task_id="start",
        dag=dag,
    )

    branch = BranchPythonOperator(
        task_id="branch",
        python_callable=branch_function,
        dag=dag,
    )

    t1 = DummyOperator(task_id="next_true_task")
    t2 = DummyOperator(task_id="next_false_task")

    start  >> branch >> [t1, t2]

AirflowUIのGraph Viewを見ると以下のようにbranchタスクからnext_true_tasknext_false_taskに分岐しているのが分かります。

スクリーンショット 2022-04-03 16.38.19.png

また実際に実行すると、以下のようにnext_false_taskskipped(ピンク色)のステータスになりました。問題なさそうです。

スクリーンショット 2022-04-03 16.42.02.png

タスクの結合

タスクの分岐はBranchPythonOperatorによって簡単に行えましたが、その後再度タスクを合流させ処理を続けたいことがあります。

Graph Viewで見ると以下のような流れですね。新たにjoinfinishタスクを置きました。

スクリーンショット 2022-04-03 16.49.07.png

joinfinishタスクはどちらもDummyOperatorによるタスクで、中身は空です。

    #
    #
    #
    join = DummyOperator(
        task_id="join"
    )
    finish = DummyOperator(
        task_id="finish"
    )

    start  >> branch >> [t1, t2] >> join >> finish

これで問題なさそうなのですが、このまま実行すると最後まで実行されず、以下のようにjoinfinishタスクまでスキップされてしまいました。

スクリーンショット 2022-04-03 16.51.17.png

実は各オペレーターの基底クラスであるBaseOperatorにはtrigger_ruleという変数があり、以下のような変数になっています。

"""
    :param trigger_rule: defines the rule by which dependencies are applied
        for the task to get triggered. Options are:
        ``{ all_success | all_failed | all_done | one_success |
        one_failed | none_failed | none_failed_min_one_success | none_skipped | always}``
        default is ``all_success``. Options can be set as string or
        using the constants defined in the static class
        ``airflow.utils.TriggerRule``
    :type trigger_rule: str
"""

上流のタスクに自分自身の実行がどのように依存するかを設定するためのもので、全て成功した場合のみのall_successや全て失敗した場合のみのall_failedなど、いくつかの設定値に変更することができます。
そして、これのデフォルト値はall_successになっており、上流の全てのタスクが成功しなければなりません。

trigger_ruleの各詳細についてはドキュメントの以下にまとまっています。

これまでに実装したDAGでは、joinタスクの上流のnext_true_tasknext_false_taskはどちらかのみしか実行されず、両方が成功することはありません。そのために、join以降のタスクも実行されずスキップされてしまったのでした。

そこで、joinタスクを以下のように変更します

    #
    #
    join = DummyOperator(
        task_id="join",
        trigger_rule="none_failed_min_one_success",
    )
    #
    #

none_failed_min_one_successは上流のタスクがひとつも失敗せず、かつ少なくとも一つは成功した場合にトリガーされます。

それでは、DAGを実行してみましょう。
以下のように、joinfinishまでタスクが実行されることが確認できました。

スクリーンショット 2022-04-03 17.10.40.png

まとめ

今回は、Airflowにおけるタスクの分岐と結合のやり方をまとめました。
方法自体は単純ですが、ドキュメントにきちんと目を通していかないとハマってしまうポイントかもしれません。

しかし、タスクの分岐、結合など、フローを柔軟に組めるようになると、できることが増えて楽しいですね!
それでは今回はここまで!

7
5
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
7
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?