はじめに
インティメート・マージャーではワークフロー管理ツールであるAirflowを活用しています。
今回は、Airflowを触り始めの初心者の方に向けて、実務上必要になることの多いスケジューリング周りの理解の助けになるような記事を書いていきたいと思います。
Airflowは公式ドキュメントが豊富でありソースコードも公開されているので、より詳しい内容について知りたい方はそちらを参照するのもおすすめです。
本記事は、Airflow2.x系を想定していますが、Airflow1.x系も対応したものになっています。
DAG定義と実行スケジュール
公式ドキュメントからDAG定義を拝借して解説していきます。
with DAG(
"my_dag_name", start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule="@daily", catchup=False
) as dag:
op = EmptyOperator(task_id="task")
DAGの実行スケジュールは、DAGクラスコンストラクタのstart_dateおよびschedule(Airflow 2.3以前はschedule_interval)引数により決定づけられます。
上記の場合、pendulum.datetime(2021, 1, 1, tz="UTC")から@dailyつまり1日ごとにDAGの実行がスケジュールされる(以降DAG Runが作成されると言い換えます)ということになります。
ここでそれぞれのDAG Runはいつ作成されるでしょうか。
この場合は、2021/1/1 00:00:00のDAG Runは2021/1/2 00:00:00以降に作成されます。
直感的には1日遅れているように思いますが、以下の図のように考えるとわかりやすいと思います。
つまり次の実行までの1日分のデータ(2021/1/1 00:00:00 ~ 2021/1/1 23:59:59)が蓄積されたタイミングでDAG Runが作成される = DAGが実行されるということです。
また、実際には0 1,2 * * *のような等間隔ではないCron表現を利用することもあるかもしれません。
その場合でも考え方は同様で、次の実行までのデータが蓄積されたタイミングでDAG Runが作成されます。つまり2023-01-01 01:00:00のDAG Runは2023-01-01 02:00:00に、2023-01-01 02:00:00のDAG Runは2023-01-02 01:00:00に作成されます。
Airflow2.3以降ではTimetableを実装することでより柔軟なスケジューリングが可能です。
https://airflow.apache.org/docs/apache-airflow/2.3.4/howto/timetable.html
日時に関する変数
理解を進めるためにここでDAGの実行に関するオブジェクトについて触れていきます。
| オブジェクト | 意味 |
|---|---|
| Task | 何を行うかの定義。PythonコードでOperatorやSensorなどをインスタンス化して実装される。 |
| Task Instance | Taskの実行ごとに作成され、ステータス等の属性を持つ。dag_id, task_id, logical_date の組み合わせでユニーク。 |
| DAG Run | DAGの実行ごとに作成され、ステータス等の属性を持つ。dag_id, logical_dateでユニーク。 |
また、各オブジェクトが持つdatetime型の属性について見てみます。
| オブジェクト | 属性 | 意味 |
|---|---|---|
| Task | start_date |
スケジュールをいつから行うかの設定値 |
| Task Instance, DAG Run |
logical_date (execution_date) |
各実行を一意に識別するための値(DAG Runが作成される時間とは異なる) |
| Task Instance, DAG Run | start_date |
各実行の実際の開始時間 |
ここで伝えたいことは以下の二点です。
一つ目は、start_dateという属性はオブジェクトにより意味合いが異なるので注意が必要ということです。
Task InstanceおよびDAG Runは実際の実行開始時間の記録であることに対して、Taskのstart_dateは設定値でありDAG定義時のstart_dateパラメータに一致します。
二つ目は、Task InstanceおよびDAG Runにはlogical_dateという、各実行を区別するような識別子としての属性があるということです。こちらは図1の処理すべきデータの開始時間に一致します。
つまり、DAG定義時のstart_dateはこのlogical_dateの最小値を指定すると考えるといいでしょう。
おわりに
Airflowのスケジューリングに関して、基本的な考え方と注意点について説明しました。
Airflowは使い始めは直感に反する動作をすると思うことがあるかもしれませんが、コンセプトを理解することで正しく使いこなせるはずです。
この記事が、DAGのスケジューリングで疑問が湧いた時の助けになれば幸いです。
明日は、Docker関連の記事についての記事だそうです!
ぜひお楽しみに!
