Airflow には Backfill (直訳: 埋め戻し)という機能があり、airflow dags backfill
で、過去の特定の期間について、DAG を改めて実行することができるが、この backfill コマンドの挙動に関して、少し理解が難しかった部分があったのでメモ。
公式ドキュメント
参考にさせていただいた記事
backfill コマンドの使用タイミング
公式ドキュメントに書いてある通りだが、例えば、start_date
が 2019-11-10
の DAG について、別途 2019-10-01
~ 2019-10-31
のデータを処理する必要がある場合など、指定された過去の期間について DAG を実行したい場合に使用する。
backfill コマンドの実行方法
基本的に start-date
, end-date
の2つのオプションを指定する。
airflow dags backfill ${dag_id} \
--start-date ${期間の開始日} \
--end-date ${期間の終了日}
Cloud Composer で Backfill をしたい場合は以下のように gcloud
コマンド経由で実行。
gcloud composer environments run ${Composer環境名} \
--location ${Composer環境のLocation} \
dags -- backfill ${dag_id} \
--start-date ${期間の開始日} \
--end-date ${期間の終了日}
start_date, end_date への値の渡し方による挙動の違い
タイムゾーンの部分でつまずきやすいので注意。
DAG の準備
まず以下のような DAG: backfill_test
を考える
import pendulum
from airflow import DAG
from airflow.operators.python import PythonOperator
def _get_logical_date(**kwargs):
jst_tz = pendulum.timezone("Asia/Tokyo")
# logical_date を取得してJST変換
logical_date = kwargs['logical_date'].in_timezone(jst_tz)
print(f'logical_date: {logical_date}')
with DAG(
dag_id='backfill_test',
start_date=pendulum.tomorrow(tz='Asia/Tokyo'),
schedule_interval='@daily',
catchup=False,
) as dag:
get_logical_date = PythonOperator(
task_id='get_logical_date',
python_callable=_get_logical_date,
)
やっている内容は、DAG の logical_date
を print
するだけ。
-
logical_date
は UTC で格納されているので JST 変換している
ポイントは、start_date
を JST で定義 していること。
つまり、DAG は毎日、日本時間の 0:00 に実行される。
-
@daily
が 0:00 実行を表す
とりあえず実行
logical_date が 2024-06-01
, 2024-06-02
の2日分において Backfill を実行したいとする。
その場合、公式ドキュメント通り書けばコマンドは以下のようになる。
airflow dags backfill backfill_test \
--start-date '2024-06-01' \
--end-date '2024-06-02'
しかし、このコマンドを実行すると、1日分しか実行されない。
具体的には logical_date
が 2024-06-02
分の DAG しか実行されず、2024-06-01
分は実行されない。
TASK: get_logical_date
の出力(一部抜粋)。
INFO - logical_date: 2024-06-02T00:00:00+09:00
原因と対応
上記のような挙動になったのは、start_date
, end_date
に単に時間を渡すと、Airflow はそれを UTC として解釈 するのが原因。
つまり、
--start-date '2024-06-01' \
--end-date '2024-06-02'
という引数は、日本時間(JST)に直すと以下のように解釈される。
-
start-date
: 2024-06-01 09:00:00 (JST) -
end-date
: 2024-06-02 09:00:00 (JST)
そのため、「日本時間の 2024-06-01 09:00 から 2024-06-02 09:00 までの期間で実行されるはずだった DAG を実行(Backfill)」という意味になる。
そして、DAG の起動時間は 0:00 (JST)なので、指定された期間に入るのは logical_date
が 2024-06-02
の 0:00 (JST) 分の DAG のみとなる。
では start_date
など、時間関連の DAG の設定値を JST で設定している場合どうすれば良いかというと、start_date
, end_date
に JST の時間を渡せば意図通りになる。
# さっき実行した DAG run は delete している
airflow dags backfill backfill_test \
--start-date '2024-06-01 00:00:00+09:00' \
--end-date '2024-06-02 00:00:00+09:00'
すると、意図した通り2つの DAG が実行された。
2024-06-01 分の DAG
TASK: get_logical_date
の出力(一部抜粋)。
INFO - logical_date: 2024-06-01T00:00:00+09:00
2024-06-02 分の DAG
TASK: get_logical_date
の出力(一部抜粋)。
INFO - logical_date: 2024-06-02T00:00:00+09:00
Bacfill 対象の TASK を絞る
DAG 内の全ての TASK を実行せず、特定の TASK のみ Backfill したい場合のメモ。
DAG の準備
DAG: backfill_test
を以下のように修正する。
import pendulum
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
def _get_logical_date(**kwargs):
jst_tz = pendulum.timezone("Asia/Tokyo")
# logical_date を取得してJST変換
logical_date = kwargs['logical_date'].in_timezone(jst_tz)
print(f'logical_date: {logical_date}')
with DAG(
dag_id='backfill_test',
start_date=pendulum.tomorrow(tz='Asia/Tokyo'),
schedule_interval='@daily',
catchup=False,
) as dag:
get_logical_date = PythonOperator(
task_id='get_logical_date',
python_callable=_get_logical_date,
)
echo_hello = BashOperator(
task_id='echo_hello',
bash_command='echo "hello"',
)
echo_world = BashOperator(
task_id='echo_world',
bash_command='echo "world"',
)
get_logical_date >> echo_hello
get_logical_date >> echo_world
get_logical_date
の後に echo_hello
と echo_world
が並列に呼び出される。
task-regex を使った TASK を絞った実行
Bacfill において、対象 TASK を絞りたい場合、task-regex
オプションを使う。
公式ドキュメント より抜粋
-t, --task-regex
The regex to filter specific task_ids (optional)
ポイントは、Upstream(上流)の TASK も実行対象となるということ。
例えば、task-regex
に echo_world
を指定して実行しみてみる。
# さっき実行した DAG run は delete している
airflow dags backfill backfill_test \
--start-date '2024-06-01 00:00:00+09:00' \
--end-date '2024-06-02 00:00:00+09:00' \
--task-regex "echo_world"
すると、echo_world
だけでなく、上流の get_logical_date
も実行された
echo_world
のみを実行したい場合は、--ignore-dependencies
オプションを指定する。
# さっき実行した DAG run は delete している
airflow dags backfill backfill_test \
--start-date '2024-06-01 00:00:00+09:00' \
--end-date '2024-06-02 00:00:00+09:00' \
--task-regex "echo_world" \
--ignore-dependencies
結果、echo_world
のみ実行された。
過去に実行した DAG run がある場合の挙動
今まで、コマンド実行前に「さっき実行した DAG run は delete している」と記載していたが、DAG run を削除しないまま Backfill をかけたらどうなるのかのメモ。
現状は、echo_world
のみ 2024-06-01
, 2024-06-02
分の実行が Success となっている状態。
Backfill 実行
とりあえず DAG 全体に対して Backfill を実行してみる。
airflow dags backfill backfill_test \
--start-date '2024-06-01 00:00:00+09:00' \
--end-date '2024-06-02 00:00:00+09:00' \
すると、echo_world
は実行されず、get_logical_date
, echo_hello
のみ実行される。
つまり、すでに成功している TASK は Backfill がかからない。
過去の DAG run の状況に関係なく Backfill をかける
全 TASK で Backfill をかけたい場合 、reset-dagruns
オプションを使う。
公式ドキュメント より抜粋
--reset-dagruns
if set, the backfill will delete existing backfill-related DAG runs and start anew with fresh, running DAG runs
つまり、以下のようにコマンドを実行すれば、過去の DAG run の状況に関係なく全ての DAG で Backfill がかかる。
airflow dags backfill backfill_test \
--start-date '2024-06-01 00:00:00+09:00' \
--end-date '2024-06-02 00:00:00+09:00' \
--reset-dagruns
もちろん、task-regex
との組み合わせも可能
airflow dags backfill backfill_test \
--start-date '2024-06-01 00:00:00+09:00' \
--end-date '2024-06-02 00:00:00+09:00' \
--task-regex "echo_world" \
--ignore-dependencies \
--reset-dagruns
こうすることで、過去の DAG run の状況に関係なく echo_world のみ Backfill がかかる。