恥ずかしながら最近になって知ったワークフローエンジン Apache Airflow。日本語の紹介記事もちらほら出てきていますが、公式ドキュメントをちょっとずつ抄訳しながら読んでいこうと思います。
10回目の今回はDAGの再実行(Re-run DAG)。
バージョン2.3.3時点のものです。
DAGの再実行(Re-run DAG)
あるDAGを再度実行したいということがあります。例えばスケジュール実行されたDAGがエラーとなった場合などです。
キャッチアップ(Catchup)
start_date
、end_date
、そしてschedule_interval
により、一連のデータ区間(Data Interval)が定義されます。スケジューラーはこの区間に基づきDAG Runを生成して実行していきます。スケジューラーは未実行の(あるいはクリアされている)データ区間のDAG Runをすべて実行します。これをキャッチアップと呼びます。
DAGがキャッチアップを処理するように記述されていない場合(つまりデータ区間に属するデータを処理するのでなく、実行されたまさにその時点に属するデータを処理するようなDAGの場合)、キャッチアップ機能をオフにする必要があります。これはDAGの設定でcatchup = False
とするか、構成ファイルでcatchup_by_default = False
とすることで可能です。キャッチアップ機能をオフにすると、スケジューラーは最新のデータ区間に対応するDAG Runのみを生成して実行します。
"""
このコードは次の場所に置かれたAirflowチュートリアルに含まれています:
https://github.com/apache/airflow/blob/main/airflow/example_dags/tutorial.py
"""
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
import datetime
import pendulum
dag = DAG(
"tutorial",
default_args={
"depends_on_past": True,
"retries": 1,
"retry_delay": datetime.timedelta(minutes=3),
},
start_date=pendulum.datetime(2015, 12, 1, tz="UTC"),
description="A simple tutorial DAG",
schedule_interval="@daily",
catchup=False,
)
上記の例では、もしDAGがスケジューラーにより2016-01-02の午前6時に処理された場合(あるいはコマンドラインから手動で実行を指示された場合も)、2016-01-01 00:00から2016-01-02 00:00前までの丸一日分のデータ区間を対象にして単一のDAG Runが生成されます。
もしdag.catchup
にTrue
が設定されていたら、スケジューラーは2015-12-01 00:00から2016-01-02 00:00前までのあいだの32個のデータ区間のそれぞれについてDAG Runを生成し、そしてスケジューラーはそれらをシーケンシャルに実行していたことでしょう。
キャッチアップ機能は当該のDAGが一定期間無効化されその後有効化された場合にも起動します。
このスケジューラーの動作は期間ごとに分割がたやすくできるデータセットの処理に最適です。DAGが内部的に別の仕組みを用いてキャッチアップを実行する場合はこの機能をオフにしましょう。
埋め戻し(Backfill)
すでに過去のものとなった特定の期間について、DAGを改めて実行したいということがありえます。例えばあるDAGのstart_dateが 2019-11-21だったとします。start_dateはこのDAGについて生成される一連のデータ区間のうち最初のデータ区間の開始日時となります。ところが何らかの事情で1ヶ月前の2019-10-21の日時でこのDAGを実行する必要がでてきたとします。このような処理を埋め戻し(Backfill)といいます。
キャッチアップはできなくても埋め戻しはできます。CLIを使って次のようなコマンドを実行します:
airflow dags backfill \
--start-date START_DATE \
--end-date END_DATE \
dag_id
backfillコマンド は開始日と終了日のあいだにあるすべてのデータ区間について再実行を行います。
タスクの再実行
スケジュール実行中にいくつかのタスクがエラーとなることもあるでしょう。エラーの原因を解消したあと、「タスクをクリアする」ことで当該タスクを再実行できます。タスクのクリアはタスク・インスタンスの削除とは違います。そうではなく、タスク・インスタンスのmax_tries
値を0
に、状態をNone
に更新するのです。これによりタスクが再実行されます。
ツリーもしくはグラフ・ビューでエラーとなったタスクをクリックし、続いて「Clear」をクリックします。するとエグゼキューターがこのタスクを再実行します。
再実行に際して複数の選択肢が用意されています。
- Past – DAGの最新のデータ区間より前に実行されたタスク・インスタンスすべて
- Future – DAGの最新のデータ区間より後に実行されるタスク・インスタンスすべて
- Upstream – DAG内の上流タスクすべて
- Downstream – DAG内の下流タスクすべて
- Recursive – 子DAGと親DAGに含まれるタスクすべて
- Failed – DAGの最新の実行結果に含まれるエラーとなったタスクのみ
タスクのクリアもCLIからコマンド操作で行うことができます:
airflow tasks clear dag_id \
--task-regex task_regex \
--start-date START_DATE \
--end-date END_DATE
このコマンドはdag_id
が指し示すDAG Runについて、正規表現のマッチするすべてのタスク・インスタンスをクリアします。他のオプションについては、clearコマンドのヘルプで確認できます。
airflow tasks clear --help
今回はかなり意訳をしました。行間を読んだり単語を読み替えたり。。。
それにしてもタスクの再実行のときの選択肢として示されているPastやFutureの意味がいまいちわかりません。。