AirFlowとは?
一言で表すと 実行タスクのワークフロー管理プラットフォームです。
対応言語はPythonのみです。(2023/07時点)
「DAG」と呼ばれる、AirFlow上でPythonコードを実行するための書き方を使用してます。
もう少し詳しく書くと、
Pythonコード内で処理タスクを生成し、各タスクに依存関係を持たせます。
AirFlowはワークフローを管理してるので、「DAG」で定義されたタスクを順番に実行している、ということです^^
タスクの生成
処理したい内容によって、動的にタスクを生成、静的にタスクを生成することを使い分けましょう。
静的にタスクを生成したい場合
主に日次で動かすチェック処理や、スナップショット作成や、簡単な単体テストで用いることが多いです。
・処理の内容が固定で決まっている場合
・処理するデータが単一の場合
動的にタスクを生成したい場合
外部から複数データが連携される場合や、洗い替えDBの登録データ数に応じてなど、ユーザー起因で処理が実行されるケースで用いることが多いです。
・処理の内容が固定ではなく変動的である場合
・処理するデータが複数の場合
AirFlow Operatorを使用しよう
DAG内でタスクを生成する際には、AirFlow Operatorを使用します。
AirFlow Operatorとは、
DAG内で定義できるOperatorです。
処理の用途ごとに様々なOperatorがあります。
ベースとなってくる基本OperatorはBaseOperator
です。
多くのOperatorはBaseOperatorを継承しており、再帰的にメソッドの使用が可能です。
使用できるパラメーターや値などは公式ドキュメントをご参考ください。
https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/baseoperator/index.html
依存関係を定義
DAG内にタスクを生成をしたら、どの順番でタスクを実行するのか定義します。
例)
# 依存関係
Check_task >> Data_Delete_task >> Data_Insert_task >> Create_File_task >> done_all_task
タスクとタスクの間は>>
で繋げます。
またタスクの依存関係を定義する際にIF文を使用することも可能です。
例)
# 分岐結果による依存関係
# ファイルカウント数が0件以上の場合
IF file_count > 0:
Data_Insert_task >> Create_File_task >> done_all_task
# ファイルカウント数が0件の場合
IF file_count = 0:
done_all_task
依存関係を定義したら実際にAirFlowにアップロードして動作を確認しましょう。
以上がAirFlowDAGでできるタスク生成の大まかな流れとなります。
AirFlowDAGについてこれから少しずつ記事を書いていこうかと思います。