1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

AiFlowと DAG内でできるタスク生成について超ざっくり書いてみる

Posted at

AirFlowとは?

一言で表すと 実行タスクのワークフロー管理プラットフォームです。
対応言語はPythonのみです。(2023/07時点)

「DAG」と呼ばれる、AirFlow上でPythonコードを実行するための書き方を使用してます。
もう少し詳しく書くと、
Pythonコード内で処理タスクを生成し、各タスクに依存関係を持たせます。
AirFlowはワークフローを管理してるので、「DAG」で定義されたタスクを順番に実行している、ということです^^

タスクの生成

処理したい内容によって、動的にタスクを生成、静的にタスクを生成することを使い分けましょう。

静的にタスクを生成したい場合

主に日次で動かすチェック処理や、スナップショット作成や、簡単な単体テストで用いることが多いです。
 ・処理の内容が固定で決まっている場合
 ・処理するデータが単一の場合

動的にタスクを生成したい場合

外部から複数データが連携される場合や、洗い替えDBの登録データ数に応じてなど、ユーザー起因で処理が実行されるケースで用いることが多いです。
 ・処理の内容が固定ではなく変動的である場合
 ・処理するデータが複数の場合
 

AirFlow Operatorを使用しよう

DAG内でタスクを生成する際には、AirFlow Operatorを使用します。

AirFlow Operatorとは、

DAG内で定義できるOperatorです。
処理の用途ごとに様々なOperatorがあります。
ベースとなってくる基本OperatorはBaseOperatorです。

多くのOperatorはBaseOperatorを継承しており、再帰的にメソッドの使用が可能です。

使用できるパラメーターや値などは公式ドキュメントをご参考ください。
https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/baseoperator/index.html

イメージとしては以下のような構成となります。
image.png

依存関係を定義

DAG内にタスクを生成をしたら、どの順番でタスクを実行するのか定義します。

例)

# 依存関係

Check_task >> Data_Delete_task >> Data_Insert_task >> Create_File_task >> done_all_task

タスクとタスクの間は>>で繋げます。

またタスクの依存関係を定義する際にIF文を使用することも可能です。

例)

# 分岐結果による依存関係

# ファイルカウント数が0件以上の場合
IF file_count > 0:
    Data_Insert_task >> Create_File_task >> done_all_task

# ファイルカウント数が0件の場合
IF file_count = 0:
    done_all_task


依存関係を定義したら実際にAirFlowにアップロードして動作を確認しましょう。

以上がAirFlowDAGでできるタスク生成の大まかな流れとなります。
AirFlowDAGについてこれから少しずつ記事を書いていこうかと思います。

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?