1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Apache Airflowのwait_for_downstreamに関する挙動(を自分なりに考えた)

Last updated at Posted at 2020-05-21

Apache Airflow で自分なりに試行錯誤したことを忘れないように記録する。

ご指摘歓迎

環境

AirflowはVersion:1.10.10
OSはCentOS Linux release 8.1.1911
DBは組み込みのSQLite

wait_for_downstreamのTrue/Falseに関する動作確認

事の発端

チュートリアルを参考にして、タスクrun_this_firstとrun_this_lastを実行させることを試したいと考えた。
実行したところ、脳内での結果と実際の結果が異なった。

試したコード

# -*- coding: utf-8 -*-

from builtins import range
from datetime import timedelta

from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.dates import days_ago

args = {
    'owner': 'Airflow',
    'start_date': days_ago(2),
    'depends_on_past': True,
}

dag = DAG(
    dag_id='bash_operator4',
    default_args=args,
    schedule_interval=timedelta(days=1),
    catchup=False,
)

run_this_first = BashOperator(
    task_id='run_this_first',
    bash_command='echo 0 >> /tmp/hogehoge ',
    dag=dag,
)

run_this_last = BashOperator(
    task_id='run_this_last',
    bash_command='echo 1 >> /tmp/hogehoge ',
    dag=dag,
)

run_this_first >> run_this_last
実行したコマンド
airflow backfill bash_operator4 -s 2015-06-1 -e 2015-06-03

DAGの処理が3回実行されるはず、つまり結果は以下ではないか。

コマンド実行前の脳内での/tmp/hogehoge出力
0
1
0
1
0
1

結果は以下の通り、DAG内の処理はrun_this_first >> run_this_lastの順になっているが3回分の処理については
1回目、2回目、3回目の順となっていない。

実際の/tmp/hogehoge出力
0
0
1
0
1
1

調査

Airflowのドキュメントを調査した。
結果、こちらのwait_for_downstreamを発見。

上記コードのargsに下記コードを追記することで脳内の結果と実際の結果が一致した。

追加コード
'wait_for_downstream': True,

もう少し気になったことと考察

結果としてargs部分にdepends_on_pastとwait_for_downstreamを設定したが、そのTrue/Falseの組み合わせで挙動が変わるかを調査した。
結果は以下の通り。(出力に工夫すればよかったと後悔した。)

image.png

wait_for_downstream=True, depends_on_past=True/False

wait_for_downstreamがTrueの場合はdepends_on_pastのTrue/Falseに関わらず、2回目の処理は1回目の処理終了を、3回目の処理は2回目の処理終了を待つので理解できる。

wait_for_downstream=Flase, depends_on_past=True

depends_on_pastがTrueの場合は前タスクインスタンスの成功に依存するため、以下の動きと理解した(ログより推測)。
なおqueueの設定はデフォルト

出力値 推測動作
0 1回目のrun_this_first処理
0 1回目のrun_this_first処理成功を受けて
2回目のrun_this_first処理がdequeue
1 1回目のrun_this_last処理後
1回目のrun_this_last処理がdequeue
0 2回目のrun_this_first処理成功を受けて
3回目のrun_this_first処理がdequeue
1 2回目のrun_this_last処理後
1回目のrun_this_last処理がdequeue
1 3回目のrun_this_last処理後
1回目のrun_this_last処理がdequeue

wait_for_downstream=Flase, depends_on_past=Flase

depends_on_pastがFlaseの場合は前タスクインスタンスの成功に依存しないため1回目、2回目、3回目のrun_this_first処理が先に処理されると理解した。

その他

引き続き触って動きの理解と調査を行う。

1
0
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?