Apache Airflow で自分なりに試行錯誤したことを忘れないように記録する。
ご指摘歓迎
環境
AirflowはVersion:1.10.10
OSはCentOS Linux release 8.1.1911
DBは組み込みのSQLite
wait_for_downstreamのTrue/Falseに関する動作確認
事の発端
チュートリアルを参考にして、タスクrun_this_firstとrun_this_lastを実行させることを試したいと考えた。
実行したところ、脳内での結果と実際の結果が異なった。
# -*- coding: utf-8 -*-
from builtins import range
from datetime import timedelta
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.dates import days_ago
args = {
'owner': 'Airflow',
'start_date': days_ago(2),
'depends_on_past': True,
}
dag = DAG(
dag_id='bash_operator4',
default_args=args,
schedule_interval=timedelta(days=1),
catchup=False,
)
run_this_first = BashOperator(
task_id='run_this_first',
bash_command='echo 0 >> /tmp/hogehoge ',
dag=dag,
)
run_this_last = BashOperator(
task_id='run_this_last',
bash_command='echo 1 >> /tmp/hogehoge ',
dag=dag,
)
run_this_first >> run_this_last
airflow backfill bash_operator4 -s 2015-06-1 -e 2015-06-03
DAGの処理が3回実行されるはず、つまり結果は以下ではないか。
0
1
0
1
0
1
結果は以下の通り、DAG内の処理はrun_this_first >> run_this_lastの順になっているが3回分の処理については
1回目、2回目、3回目の順となっていない。
0
0
1
0
1
1
調査
Airflowのドキュメントを調査した。
結果、こちらのwait_for_downstreamを発見。
上記コードのargsに下記コードを追記することで脳内の結果と実際の結果が一致した。
'wait_for_downstream': True,
もう少し気になったことと考察
結果としてargs部分にdepends_on_pastとwait_for_downstreamを設定したが、そのTrue/Falseの組み合わせで挙動が変わるかを調査した。
結果は以下の通り。(出力に工夫すればよかったと後悔した。)
wait_for_downstream=True, depends_on_past=True/False
wait_for_downstreamがTrueの場合はdepends_on_pastのTrue/Falseに関わらず、2回目の処理は1回目の処理終了を、3回目の処理は2回目の処理終了を待つので理解できる。
wait_for_downstream=Flase, depends_on_past=True
depends_on_pastがTrueの場合は前タスクインスタンスの成功に依存するため、以下の動きと理解した(ログより推測)。
なおqueueの設定はデフォルト
出力値 | 推測動作 |
---|---|
0 | 1回目のrun_this_first処理 |
0 | 1回目のrun_this_first処理成功を受けて 2回目のrun_this_first処理がdequeue |
1 | 1回目のrun_this_last処理後 1回目のrun_this_last処理がdequeue |
0 | 2回目のrun_this_first処理成功を受けて 3回目のrun_this_first処理がdequeue |
1 | 2回目のrun_this_last処理後 1回目のrun_this_last処理がdequeue |
1 | 3回目のrun_this_last処理後 1回目のrun_this_last処理がdequeue |
wait_for_downstream=Flase, depends_on_past=Flase
depends_on_pastがFlaseの場合は前タスクインスタンスの成功に依存しないため1回目、2回目、3回目のrun_this_first処理が先に処理されると理解した。
その他
引き続き触って動きの理解と調査を行う。