7
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Airflowのtrigger ruleとハマりどころ

Posted at

Airflowなどのワークフローエンジンのメリットに、処理の依存関係を制御出来ることがあります。
この記事では、Airflowにおける依存関係の設定(の一つ)、trigger ruleについて説明します。

trigger ruleとは

タスク間の依存関係のルールです。

Airflowでは、デフォルトでも

task1_1 >> task2
task1_2 >> task2

のように書くと、task1_1とtask1_2の両方成功した時にtask2を実行するように、タスク間の依存を設定出来ます。

trigger_ruleをタスクに設定することで、

  • 両方失敗
  • いずれかが成功
    など、依存の仕方を指定出来るようになります。

用語

  • Operator
  • Sensor
  • タスク
  • タスクインスタンス
  • upstream/downstream

設定の仕方

公式ドキュメント例の、


join = DummyOperator(task_id='join', dag=dag, trigger_rule='none_failed')

みたいな感じで、Downstream側のタスクに設定します(BaseOperatorで定義されているので、どのオペレータでも設定出来ます)。
ちなみに、文字列でも定数(airflow.utils.TriggerRule)でも設定出来ます。

ルールの一覧

1.10.5の時点で8種類あります。デフォルトはall_successです。

all_success: (default) all parents have succeeded

all_failed: all parents are in a failed or upstream_failed state

all_done: all parents are done with their execution

one_failed: fires as soon as at least one parent has failed, it does not wait for all parents to be done

one_success: fires as soon as at least one parent succeeds, it does not wait for all parents to be done

none_failed: all parents have not failed (failed or upstream_failed) i.e. all parents have succeeded or been skipped

none_skipped: no parent is in a skipped state, i.e. all parents are in a success, failed, or upstream_failed state

dummy: dependencies are just for show, trigger at will

名前で大体わかりますが、具体的な挙動はtirgger_rule_depで確認出来ます。
(なお、all_doneはupstreamのskipもdoneとみなします

落とし穴

trigger_ruleはタスクが実行されるかを指定しますが、それだけ決まるわけではないです。

「タスクインスタンス動かない…」みたいになる人が多いらしく、ドキュメントにもFAQがあり、

  • depends_on_past(前のDagRunのタスクインスタンスが成功に依存)
  • pool
  • タスクの並列度の設定(concurrency)
  • start_date

などの例が書かれています。

ハマった例:Sensorのsoft_fail

FAQに無いハマりどころに、upstreamのタスクインスタンスがdownstreamのタスクインスタンスをskipにするケースがあります。
この場合、downstreamのタスクをtrigger_ruleをall_doneにしても実行されないようです。

私がハマった、Sensorのsoft_failオプションの例です:

  1. AirflowのSensorは、センサー対象(Pub/Subなど)が条件(具象クラスのpokeメソッド)を満たすまで待ちます
  2. timeoutオプションが設定されていると、一定時間内に条件を満たさなければタスクインスタンスはfailします
  3. soft_failオプションがtrueの場合、タイムアウトした場合、タスクインスタンスはskipします
  4. soft_failオプションはSensor自身だけではなく、downstreamのタスクもskipします

コード的にはここらへんです。

                    self._do_skip_downstream_tasks(context)

関連リンク

7
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
7
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?