本記事は、オープンソースのワークフローエンジンである Apache Airflow の説明と実際のハンズオンを記載する。
AWS では、マネージドサービスとして 2020年11月に Amazon Managed Workflow for Apache Airflow がリリースされたが、そちらは本記事に併せて今後投稿予定。
#Apache Airflowとは
Airflowは、2014年にAirbnb社が開発したオープンソースであり、2016年より Apache財団となる。開発言語は Pythonで、ワークフローエンジンに該当する。
Airflowは、予め決められた順序を基に、処理を実行するワークフローをプログラムで作成する。また、スケジュールや監視を行う事が可能。
ワークフローはタスクの有向非巡回グラフ(DAG)を作成する事により、タスクを実行する。
DAGとは
DAGとは有効非巡回グラフの事で、定義された処理(タスク)をトポロジカルソートによって整理し、ワークフローとして表現するものである。
重要なのは、エッジ(線)のベクトルが有向で、さらに一度通ったノード(点)には戻ってこれないという非巡回性の特性を持つ事である。
##アーキテクチャ
Apache Airflow を理解するために、アーキテクチャを理解していく。
Apache AirFlow Component
AirFlowは主に3つの役割をがコンポーネントとして含まれる。
- Webserver : 画面表示部
- Scheduler : 実行スケジュール管理部
- Executer : 実行部
taskを実行するExecuter、実行スケジュールを管理するScheduler、WebUIでブラウザ上に表示を行うWebserverが存在する。
コンポーネントと大まかな順序は以下のようなイメージ。管理DBを介して 実行スケジュールや依存関係、実行結果を共有する。
Apache AirFlow プログラムアーキテクチャ
Airflow のプログラムは以下のように大きく3つに区分される。
- DAG : どのような順序で実行されるかを記述したプログラム
- Operator : プログラムを実行するためのテンプレート
- Task : 実行する処理
Task の定義に Operator が記述されており、Python で処理を実行するための PythonOperator Bash 実行のための BashOperator、他各種 RDBMS や Hive、AWS や GCP など 様々なサービスの APIをコールする Operator が用意されている。
サンプルコードから処理を見てみる.
dag = DAG( //定義
‘tutorial’, // DAG 名: tutorial
default_args=default_args, // パラメーターをデフォルト引数で指定
schedule_interval=timedelta(days=1) // 実行スケジュール間隔を 1日分として指定
)
t1 = BashOperator( // task : t1 を BashOperator で定義
task_id=‘print_date’, // task_id を print_date
bash_command=‘date’, // bash date コマンドの実行
dag=dag, // どの DAG を実行するか定義
)
t2 = BashOperator( // task : t2 を BashOperator で定義
task_id='sleep’, // task_id を sleep
depends_on_past=False, // 指定したタスクの上流タスクの実行が失敗した場合、タスクを実行するかどうかを設定 #False 実行しない
bash_command=‘sleep 5’, // bash sleep 5 の実行
dag=dag,
)
t2.set_upstream(t1) // 実行順序を記載、# t1実行完了後に t2 を実行
###その他機能
その他 Airflow の機能については以下が存在する。
- Connection:各種データストアへの接続情報を管理
- Hook:Connection を使ってデータストアにアクセスしたり、データをload/dumpするためのメソッド
- Pools:タスクの並列数を管理
- Queue:Celeryのような、外部のキューイングシステムをジョブキューとして利用可能
- Branching:DAG中での条件分岐を実現
- SLA:一定時間内に成功しなかったtaskを管理者にメール通知
#導入してみる
導入を行う手順は以下とし、taskのステータスが処理状況が確認出来れば完了とする。
- Dockerコンテナ(Docker for Windows)に Airflowのリポジトリを追加し、WebServerのUIを起動させる。
- ブラウザから Airflowの表示画面を確認出来たら、DAG の作成、Operator、Task を定義する。
- 作成したファイルから再度 Webserverを立ち上げ、DAGが作成されている事を確認し、画面上から Taskの実行を試みる。
- Taskの処理状況を確認する。
##手順
コンテナ上に Airflow のリポジトリ を pullして起動し、ブラウザ上で起動してみたところ、以下のような UIとなる。
> docker pull puckel/docker-airflow
Status: Downloaded newer image for puckel/docker-airflow:latest
> docker run -p 8080:8080 --name airflow puckel/docker-airflow
exec ..
Airflow リポジトリのプロジェクトルートに dags ディレクトリを作成し
$AIRFLOW_HOME/dags 配下に sample.py を配置
述部は割愛するが定義は以下を記載
#DAGの生成 (定義情報) DAG名: first_dag
#DAGに紐づくタスクの生成 タスク名: t1,t2,t3,t4#タスク間の依存関係を定義
#処理順序 t1の後に t2とt3 を実行、t2・t3 の後にt4が実行
t2.set_upstream(t1)
t3.set_upstream(t1)
t4.set_upstream([t2, t3])
リトライ
Airflow webserver
first_dag という DAG が 生成されている事が確認できる。
また、first_dag を選択すると、Taskが可視化される
実行順序は以下となっている
① t1 実行
② t2 および t3 の実行
③ t2 および t3 の実行が完了した後に t4 の実行
実行形式は以下となる
Toggleを onにクリック (定刻実行 )
Trigger Dagをクリック (即時実行)