Airflowなどのワークフローエンジンのメリットに、処理の依存関係を制御出来ることがあります。
この記事では、Airflowにおける依存関係の設定(の一つ)、trigger ruleについて説明します。
trigger ruleとは
タスク間の依存関係のルールです。
Airflowでは、デフォルトでも
task1_1 >> task2
task1_2 >> task2
のように書くと、task1_1とtask1_2の両方が成功した時にtask2を実行するように、タスク間の依存を設定出来ます。
trigger_ruleをタスクに設定することで、
- 両方が失敗
-
いずれかがが成功
など、依存の仕方を指定出来るようになります。
用語
- Operator
- Sensor
- タスク
- タスクインスタンス
- upstream/downstream
設定の仕方
公式ドキュメント例の、
join = DummyOperator(task_id='join', dag=dag, trigger_rule='none_failed')
みたいな感じで、Downstream側のタスクに設定します(BaseOperatorで定義されているので、どのオペレータでも設定出来ます)。
ちなみに、文字列でも定数(airflow.utils.TriggerRule)でも設定出来ます。
ルールの一覧
1.10.5の時点で8種類あります。デフォルトはall_successです。
all_success: (default) all parents have succeeded
all_failed: all parents are in a failed or upstream_failed state
all_done: all parents are done with their execution
one_failed: fires as soon as at least one parent has failed, it does not wait for all parents to be done
one_success: fires as soon as at least one parent succeeds, it does not wait for all parents to be done
none_failed: all parents have not failed (failed or upstream_failed) i.e. all parents have succeeded or been skipped
none_skipped: no parent is in a skipped state, i.e. all parents are in a success, failed, or upstream_failed state
dummy: dependencies are just for show, trigger at will
名前で大体わかりますが、具体的な挙動はtirgger_rule_depで確認出来ます。
(なお、all_doneはupstreamのskipもdoneとみなします)
落とし穴
trigger_ruleはタスクが実行されるかを指定しますが、それだけ決まるわけではないです。
「タスクインスタンス動かない…」みたいになる人が多いらしく、ドキュメントにもFAQがあり、
- depends_on_past(前のDagRunのタスクインスタンスが成功に依存)
- pool
- タスクの並列度の設定(concurrency)
- start_date
などの例が書かれています。
ハマった例:Sensorのsoft_fail
FAQに無いハマりどころに、upstreamのタスクインスタンスがdownstreamのタスクインスタンスをskipにするケースがあります。
この場合、downstreamのタスクをtrigger_ruleをall_doneにしても実行されないようです。
私がハマった、Sensorのsoft_failオプションの例です:
- AirflowのSensorは、センサー対象(Pub/Subなど)が条件(具象クラスのpokeメソッド)を満たすまで待ちます
- timeoutオプションが設定されていると、一定時間内に条件を満たさなければタスクインスタンスはfailします
- soft_failオプションがtrueの場合、タイムアウトした場合、タスクインスタンスはskipします
- soft_failオプションはSensor自身だけではなく、downstreamのタスクもskipします
self._do_skip_downstream_tasks(context)