34
24

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

ワークフローエンジン Apache Airflowの理解を深める

Posted at

本記事は、オープンソースのワークフローエンジンである Apache Airflow の説明と実際のハンズオンを記載する。
AWS では、マネージドサービスとして 2020年11月に Amazon Managed Workflow for Apache Airflow がリリースされたが、そちらは本記事に併せて今後投稿予定。

#Apache Airflowとは
Airflowは、2014年にAirbnb社が開発したオープンソースであり、2016年より Apache財団となる。開発言語は Pythonで、ワークフローエンジンに該当する。

Airflowは、予め決められた順序を基に、処理を実行するワークフローをプログラムで作成する。また、スケジュールや監視を行う事が可能。
ワークフローはタスクの有向非巡回グラフ(DAG)を作成する事により、タスクを実行する。

DAGとは

DAGとは有効非巡回グラフの事で、定義された処理(タスク)をトポロジカルソートによって整理し、ワークフローとして表現するものである。
重要なのは、エッジ(線)のベクトルが有向で、さらに一度通ったノード(点)には戻ってこれないという非巡回性の特性を持つ事である。
image.png

##アーキテクチャ
Apache Airflow を理解するために、アーキテクチャを理解していく。

Apache AirFlow Component

AirFlowは主に3つの役割をがコンポーネントとして含まれる。

  • Webserver : 画面表示部
  • Scheduler : 実行スケジュール管理部
  • Executer : 実行部

taskを実行するExecuter、実行スケジュールを管理するScheduler、WebUIでブラウザ上に表示を行うWebserverが存在する。
コンポーネントと大まかな順序は以下のようなイメージ。管理DBを介して 実行スケジュールや依存関係、実行結果を共有する。
image.png

Apache AirFlow プログラムアーキテクチャ

Airflow のプログラムは以下のように大きく3つに区分される。

  • DAG : どのような順序で実行されるかを記述したプログラム
  • Operator : プログラムを実行するためのテンプレート
  • Task : 実行する処理

Task の定義に Operator が記述されており、Python で処理を実行するための PythonOperator Bash 実行のための BashOperator、他各種 RDBMS や Hive、AWS や GCP など 様々なサービスの APIをコールする Operator が用意されている。
image.png

サンプルコードから処理を見てみる.


dag = DAG(                              //定義
    ‘tutorial’,                         // DAG 名: tutorial
    default_args=default_args,          // パラメーターをデフォルト引数で指定
    schedule_interval=timedelta(days=1) // 実行スケジュール間隔を 1日分として指定
)
t1 = BashOperator(                      // task : t1 を BashOperator で定義
    task_id=‘print_date’,               // task_id を print_date 
    bash_command=‘date’,                // bash  date コマンドの実行
    dag=dag,                            // どの DAG を実行するか定義
)
t2 = BashOperator(                      // task : t2 を BashOperator で定義
task_id='sleep’,                        // task_id を sleep
    depends_on_past=False,              // 指定したタスクの上流タスクの実行が失敗した場合、タスクを実行するかどうかを設定 #False 実行しない
    bash_command=‘sleep 5’,             // bash sleep 5 の実行
    dag=dag,
)

t2.set_upstream(t1)                     // 実行順序を記載、# t1実行完了後に t2 を実行

###その他機能

その他 Airflow の機能については以下が存在する。

  • Connection:各種データストアへの接続情報を管理
  • Hook:Connection を使ってデータストアにアクセスしたり、データをload/dumpするためのメソッド
  • Pools:タスクの並列数を管理
  • Queue:Celeryのような、外部のキューイングシステムをジョブキューとして利用可能
  • Branching:DAG中での条件分岐を実現
  • SLA:一定時間内に成功しなかったtaskを管理者にメール通知

#導入してみる

導入を行う手順は以下とし、taskのステータスが処理状況が確認出来れば完了とする。

  • Dockerコンテナ(Docker for Windows)に Airflowのリポジトリを追加し、WebServerのUIを起動させる。
  • ブラウザから Airflowの表示画面を確認出来たら、DAG の作成、Operator、Task を定義する。
  • 作成したファイルから再度 Webserverを立ち上げ、DAGが作成されている事を確認し、画面上から Taskの実行を試みる。
  • Taskの処理状況を確認する。

##手順

コンテナ上に Airflow のリポジトリ を pullして起動し、ブラウザ上で起動してみたところ、以下のような UIとなる。

> docker pull puckel/docker-airflow
Status: Downloaded newer image for puckel/docker-airflow:latest

> docker run -p 8080:8080 --name airflow puckel/docker-airflow
exec ..

image.png

Airflow リポジトリのプロジェクトルートに dags ディレクトリを作成し
$AIRFLOW_HOME/dags 配下に sample.py を配置

述部は割愛するが定義は以下を記載

#DAGの生成 (定義情報) DAG名: first_dag 
#DAGに紐づくタスクの生成  タスク名: t1,t2,t3,t4#タスク間の依存関係を定義
#処理順序  t1の後に t2とt3 を実行、t2・t3 の後にt4が実行
t2.set_upstream(t1)
t3.set_upstream(t1)
t4.set_upstream([t2, t3])

リトライ

Airflow webserver

first_dag という DAG が 生成されている事が確認できる。
image.png

また、first_dag を選択すると、Taskが可視化される

実行順序は以下となっている
① t1 実行
② t2 および t3 の実行
③ t2 および t3 の実行が完了した後に t4 の実行
image.png

実行形式は以下となる
Toggleを onにクリック (定刻実行 )
Trigger Dagをクリック (即時実行)

image.png

実行状況は taskフローから確認する事が出来た。
image.png

34
24
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
34
24

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?