0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Airflow の backfill コマンドの挙動確認

Last updated at Posted at 2024-06-13

Airflow には Backfill (直訳: 埋め戻し)という機能があり、airflow dags backfill で、過去の特定の期間について、DAG を改めて実行することができるが、この backfill コマンドの挙動に関して、少し理解が難しかった部分があったのでメモ。

公式ドキュメント

参考にさせていただいた記事

backfill コマンドの使用タイミング

公式ドキュメントに書いてある通りだが、例えば、start_date2019-11-10 の DAG について、別途 2019-10-01 ~ 2019-10-31 のデータを処理する必要がある場合など、指定された過去の期間について DAG を実行したい場合に使用する。

backfill コマンドの実行方法

基本的に start-date, end-date の2つのオプションを指定する。

airflow dags backfill ${dag_id} \
  --start-date ${期間の開始日} \
  --end-date ${期間の終了日}

Cloud Composer で Backfill をしたい場合は以下のように gcloud コマンド経由で実行。

gcloud composer environments run ${Composer環境名} \
--location  ${Composer環境のLocation} \
dags -- backfill ${dag_id} \
  --start-date ${期間の開始日} \
  --end-date ${期間の終了日}

start_date, end_date への値の渡し方による挙動の違い

タイムゾーンの部分でつまずきやすいので注意。

DAG の準備

まず以下のような DAG: backfill_test を考える


import pendulum
from airflow import DAG
from airflow.operators.python import PythonOperator


def _get_logical_date(**kwargs):
    jst_tz = pendulum.timezone("Asia/Tokyo")
    # logical_date を取得してJST変換
    logical_date = kwargs['logical_date'].in_timezone(jst_tz)
    print(f'logical_date: {logical_date}')

with DAG(
        dag_id='backfill_test',
        start_date=pendulum.tomorrow(tz='Asia/Tokyo'),
        schedule_interval='@daily',
        catchup=False,
) as dag:
    get_logical_date = PythonOperator(
        task_id='get_logical_date',
        python_callable=_get_logical_date,
    )

やっている内容は、DAG の logical_dateprint するだけ。

  • logical_date は UTC で格納されているので JST 変換している

ポイントは、start_dateJST で定義 していること。
つまり、DAG は毎日、日本時間の 0:00 に実行される。

  • @daily が 0:00 実行を表す

とりあえず実行

logical_date が 2024-06-01, 2024-06-02 の2日分において Backfill を実行したいとする。
その場合、公式ドキュメント通り書けばコマンドは以下のようになる。

airflow dags backfill backfill_test  \
  --start-date '2024-06-01' \
  --end-date '2024-06-02'

しかし、このコマンドを実行すると、1日分しか実行されない。

image.png

具体的には logical_date2024-06-02 分の DAG しか実行されず、2024-06-01 分は実行されない。
TASK: get_logical_date の出力(一部抜粋)。

INFO - logical_date: 2024-06-02T00:00:00+09:00

原因と対応

上記のような挙動になったのは、start_date, end_date に単に時間を渡すと、Airflow はそれを UTC として解釈 するのが原因。
つまり、

--start-date '2024-06-01' \
--end-date '2024-06-02'

という引数は、日本時間(JST)に直すと以下のように解釈される。

  • start-date: 2024-06-01 09:00:00 (JST)
  • end-date: 2024-06-02 09:00:00 (JST)

そのため、「日本時間の 2024-06-01 09:00 から 2024-06-02 09:00 までの期間で実行されるはずだった DAG を実行(Backfill)」という意味になる。
そして、DAG の起動時間は 0:00 (JST)なので、指定された期間に入るのは logical_date2024-06-02 の 0:00 (JST) 分の DAG のみとなる。

では start_date など、時間関連の DAG の設定値を JST で設定している場合どうすれば良いかというと、start_date, end_date に JST の時間を渡せば意図通りになる。

# さっき実行した DAG run は delete している
airflow dags backfill backfill_test  \
  --start-date '2024-06-01 00:00:00+09:00' \
  --end-date '2024-06-02 00:00:00+09:00'

すると、意図した通り2つの DAG が実行された。

2024-06-01 分の DAG

image.png

TASK: get_logical_date の出力(一部抜粋)。

INFO - logical_date: 2024-06-01T00:00:00+09:00

2024-06-02 分の DAG

image.png

TASK: get_logical_date の出力(一部抜粋)。

INFO - logical_date: 2024-06-02T00:00:00+09:00

Bacfill 対象の TASK を絞る

DAG 内の全ての TASK を実行せず、特定の TASK のみ Backfill したい場合のメモ。

DAG の準備

DAG: backfill_test を以下のように修正する。

import pendulum
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
 
 
def _get_logical_date(**kwargs):
    jst_tz = pendulum.timezone("Asia/Tokyo")
    # logical_date を取得してJST変換
    logical_date = kwargs['logical_date'].in_timezone(jst_tz)
    print(f'logical_date: {logical_date}')
 
with DAG(
        dag_id='backfill_test',
        start_date=pendulum.tomorrow(tz='Asia/Tokyo'),
        schedule_interval='@daily',
        catchup=False,
) as dag:
    get_logical_date = PythonOperator(
        task_id='get_logical_date',
        python_callable=_get_logical_date,
    )
 
    echo_hello = BashOperator(
        task_id='echo_hello',
        bash_command='echo "hello"',
    )
 
    echo_world = BashOperator(
        task_id='echo_world',
        bash_command='echo "world"',
    )
 
    get_logical_date >> echo_hello
    get_logical_date >> echo_world

get_logical_date の後に echo_helloecho_world が並列に呼び出される。

スクリーンショット 2024-06-13 12.27.49.png

task-regex を使った TASK を絞った実行

Bacfill において、対象 TASK を絞りたい場合、task-regex オプションを使う。

公式ドキュメント より抜粋

-t, --task-regex

The regex to filter specific task_ids (optional)

ポイントは、Upstream(上流)の TASK も実行対象となるということ。
例えば、task-regexecho_world を指定して実行しみてみる。

# さっき実行した DAG run は delete している
airflow dags backfill backfill_test  \
  --start-date '2024-06-01 00:00:00+09:00' \
  --end-date '2024-06-02 00:00:00+09:00' \
  --task-regex "echo_world"

すると、echo_world だけでなく、上流の get_logical_date も実行された

image.png

echo_world のみを実行したい場合は、--ignore-dependencies オプションを指定する。

# さっき実行した DAG run は delete している
airflow dags backfill backfill_test  \
  --start-date '2024-06-01 00:00:00+09:00' \
  --end-date '2024-06-02 00:00:00+09:00' \
  --task-regex "echo_world" \
  --ignore-dependencies

結果、echo_world のみ実行された。

スクリーンショット 2024-06-13 12.28.47.png

過去に実行した DAG run がある場合の挙動

今まで、コマンド実行前に「さっき実行した DAG run は delete している」と記載していたが、DAG run を削除しないまま Backfill をかけたらどうなるのかのメモ。

現状は、echo_world のみ 2024-06-01, 2024-06-02 分の実行が Success となっている状態。

スクリーンショット 2024-06-13 12.28.47.png

Backfill 実行

とりあえず DAG 全体に対して Backfill を実行してみる。

airflow dags backfill backfill_test  \
  --start-date '2024-06-01 00:00:00+09:00' \
  --end-date '2024-06-02 00:00:00+09:00' \

すると、echo_world は実行されず、get_logical_date, echo_hello のみ実行される
つまり、すでに成功している TASK は Backfill がかからない。

過去の DAG run の状況に関係なく Backfill をかける

全 TASK で Backfill をかけたい場合 、reset-dagruns オプションを使う。

公式ドキュメント より抜粋

--reset-dagruns

if set, the backfill will delete existing backfill-related DAG runs and start anew with fresh, running DAG runs

つまり、以下のようにコマンドを実行すれば、過去の DAG run の状況に関係なく全ての DAG で Backfill がかかる。

airflow dags backfill backfill_test  \
  --start-date '2024-06-01 00:00:00+09:00' \
  --end-date '2024-06-02 00:00:00+09:00' \
  --reset-dagruns

もちろん、task-regex との組み合わせも可能

airflow dags backfill backfill_test  \
  --start-date '2024-06-01 00:00:00+09:00' \
  --end-date '2024-06-02 00:00:00+09:00' \
  --task-regex "echo_world" \
  --ignore-dependencies \
   --reset-dagruns

こうすることで、過去の DAG run の状況に関係なく echo_world のみ Backfill がかかる。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?