LoginSignup
91
79

More than 3 years have passed since last update.

【Airflow】最近よく聞くAirflowに入門!EC2で動かしてみた【CI/CD】

Last updated at Posted at 2019-01-07

はじめに

せっかくの連休なので新しいこと勉強してみたいと思い、reInvent2018にて色々なところで名前が出てきた「Airflow」について触れていきます!

なかなかイケてるという噂なので、「Airflowとは何か」から「EC2での導入」「簡単な操作方法
」までこの記事ではまとめてみようと思います。
最近CI/CD周りに興味があるので勉強して業務に活かせるか検討したいと思います。

-----12/19 追記------
Airflow関連の記事も増えてきたのでリンクしておきます。
Airflowでモデルの学習からデプロイまでをやってみた
Airflowをここ3ヶ月触ってみた
Airflow - データパイプラインのスケジュールと監視をプログラムしてみた

What's Airflow ?

スクリーンショット 2018-12-29 14.51.06.png

Airflow is a platform to programmatically author, schedule and monitor workflows.

Airflowはプログラムでワークフローを作成、スケジュール設定、監視するためのプラットフォームである(公式ドキュメントから引用)

ということで、ETL周りをGUIベースで実行・管理をサポートしてくれるツールのようです。
運用者としては、GUIベースでの設定やダッシュボードによる監視ができるのは嬉しい点ですね。また、DAG(Directed Acyclic Graph)のグラフ理論を用いたワークフロー構築がなされています。DAGは有向非巡回グラフと呼ばれ、ETLタスクを以下の図のようにDAGっぽくつなぎ合わせワークフローを形成するイメージです。
参考:有向非巡回グラフ(DAG)の意味とトポロジカルソート

DAG_tietovarasto.png

歴史としては、Airbnb社で2014年に始まり、2015年にオープンソースとして公開後、2016年にApachからのサポート受け「Apach Airflow」となって今に至っているようです。
Github:https://github.com/apache/incubator-airflow

イケてるポイント

Airflowがイケてると思われるポイントをまとめてみました。この点を実際に使ったあとに評価していきたいと思います。

  • 実行履歴の可視化(GUIで管理可能)
    dags.png

  • 手動or自動によるワークフローのリトライ

  • 並列処理

EC2へ導入

ホストへインストールを試みたところ、pip installではまってしまったため、dockerで環境を持ち込むことにしました。

以下のdockerリポジトリからpullしてきました。
https://hub.docker.com/r/puckel/docker-airflow

sudo docker pull puckel/docker-airflow

pullしてきたら、以下で立ち上げます。

sudo docker run -p 8080:8080 --name airflow puckel/docker-airflow

ログが表示されたらひとまずairflowの起動は完了です。
スクリーンショット 2018-12-30 22.28.21.png

この状態でhttp://(EC2(SGでポート8080解放済み)のIP):8080(/admin/)にアクセスすると
スクリーンショット 2018-12-30 22.30.15.png

airflowの管理者ページに飛ぶことができます。

また、立ち上げたコンテナに対して

sudo docker exec -it airflow /bin/bash

でコンテナに入り、airflow webserverを実行しても同様にログが流れてairflowが起動します。

tutorial

ViewとCode

まず、tutorialをベースにどんな感じになっているかを調査していきたいと思います。
上記のdockerを立ち上げて、ブラウザから管理者ページにアクセスするとサンプル一覧が表示されます。

スクリーンショット 2018-12-31 1.40.15.png

この一番下にtutorialというDAGがあります。
クリックすると、簡単なbash処理が3つ表示されます。

スクリーンショット 2018-12-31 11.50.56.png

ここに表示されているのはtutorailというDAGの中のprint_date,sleep,templatedというtask_idがどのようなワークフローで組まれているかが表示されています。

では、このそれぞれのtask_idがどのように設定されているかをみてみます。Graph Viewがあるメニュー上のCodeをクリックしてみると、tutorialのスクリプトが表示されます。

tutorial.py
# -*- coding: utf-8 -*-

from datetime import timedelta

import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator

# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': airflow.utils.dates.days_ago(2),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
    # 'wait_for_downstream': False,
    # 'dag': dag,
    # 'adhoc':False,
    # 'sla': timedelta(hours=2),
    # 'execution_timeout': timedelta(seconds=300),
    # 'on_failure_callback': some_function,
    # 'on_success_callback': some_other_function,
    # 'on_retry_callback': another_function,
    # 'trigger_rule': u'all_success'
}

□ airflowにはいくつかOperatorが用意されている

Operatorは簡単に言うと、何で処理を実行するかという設定になります。このスクリプトではbashのみの処理のため、from airflow.operators.bash_operator import BashOperatorをインポートしています。

□ start_dateは必須の設定

start_dateはいつからこのDAGを実行するかという設定になります。ここで一つポイントなのが、過去に設定した場合、その設定した過去から処理が始まるという点です。
例えば、intervalの設定を12時間ごとにしてstart_dateを2日前にしてDAGを実行すると、2日前の12時,24時,1日前の12時,24時の分も実行されます。

tutorial.py
dag = DAG(
    'tutorial',
    default_args=default_args,
    description='A simple tutorial DAG',
    schedule_interval=timedelta(days=1),
)

# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag,
)

t2 = BashOperator(
    task_id='sleep',
    depends_on_past=False,
    bash_command='sleep 5',
    dag=dag,
)

templated_command = """
{% for i in range(5) %}
    echo "{{ ds }}"
    echo "{{ macros.ds_add(ds, 7)}}"
    echo "{{ params.my_param }}"
{% endfor %}
"""

t3 = BashOperator(
    task_id='templated',
    depends_on_past=False,
    bash_command=templated_command,
    params={'my_param': 'Parameter I passed in'},
    dag=dag,
)

t1 >> [t2, t3]

□ DAGで大枠を設定

第一引数がDAG名になります。GUIで表示されます。このDAGの説明をdescriptionで設定できたりします。DAGで一番重要なのがschedule_intervalです。このスクリプトのようにtimedelta(days=1)で1日ごとの設定ができたり 、cronのように* * * * * commandでも設定できます。

□ 細かな処理を定義

今回は3つ定義されています。先ほどのViewで表示されていたprint_date,sleep,templatedがここのtask_idに設定されています。

□ t1 >> [t2, t3]

最後にワークフローを構築します。このスクリプトように直感的にもかけますし、他の書き方あるようです。
参考:https://airflow.incubator.apache.org/tutorial.html
スクリーンショット 2018-12-31 12.17.29.png

ここまでで、管理者ページに表示されていたGraphViewと実際の処理スクリプトとの関係がわかったかと思います。
ただ、このtutorialを実行しても結果がわかりにくいので処理結果がわかりやすいようなDAGを構築してみます。

tutorial_dev

tutorialをベースにした検証用DAG:tutorial_devを構築します。

まず、airflowがどこのスクリプト参照しているかを確認します。
上記のコンテナにはいると、airflow.cfgがあります。このファイルにairflowの設定がまとまっています。

airflow.cfg
[core]
# The home folder for airflow, default is ~/airflow
airflow_home = /usr/local/airflow

# The folder where your airflow pipelines live, most likely a
# subfolder in a code repository
# This path must be absolute
dags_folder = /usr/local/airflow/dags

# The folder where airflow should store its log files
# This path must be absolute
base_log_folder = /usr/local/airflow/logs

上記のdags_folderがairflowが参照しているDAGのフォルダーになります。しかし、普通にコンテナに入るとこのフォルダがないのでdagsを作ります。

dags直下に以下のようなtutorial_dev.pyを作ります。

tutorial_dev.py
# -*- coding: utf-8 -*-
from datetime import datetime, timedelta

import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2018, 12, 31),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
}

dag = DAG(
    'tutorial_dev',
    default_args=default_args,
    catchup=False,
    # schedule_interval=timedelta(minutes=3),
)

bash_command = """
    echo output_st_start >> ~/output/test.txt
    date >> ~/output/test.txt
    echo output_st_end >> ~/output/test.txt
"""

t1 = BashOperator(
    task_id='output_start',
    bash_command=bash_command,
    dag=dag,
)

bash_command = """
    echo output1start >> ~/output/test.txt
    date >> ~/output/test.txt
    echo output1end >> ~/output/test.txt
    sleep 3
"""

t2 = BashOperator(
    task_id='output1',
    depends_on_past=False,
    bash_command=bash_command,
    dag=dag,
)

bash_command = """
    echo output2start >> ~/output/test.txt
    date >> ~/output/test.txt
    echo output2end >> ~/output/test.txt
    sleep 3
"""

t3 = BashOperator(
    task_id='output2',
    depends_on_past=False,
    bash_command=bash_command,
    dag=dag,
)

bash_command = """
    echo output_fin_start >> ~/output/test.txt
    date >> ~/output/test.txt
    echo output_fin_end >> ~/output/test.txt
"""

t4 = BashOperator(
    task_id='output_finish',
    depends_on_past=False,
    bash_command=bash_command,
    dag=dag,
)

t1 >> [t2, t3] >> t4

□ 簡単な仕様説明

ワークフローが意図した順序で実行されているかを確認したいため、taskとしてdate >> ~/output/test.txtを定義します。

ここで1つ検証要素を追加します。今回、構築したワークフローt1 >> [t2, t3] >> t4ですがt2t3に分岐したあとt4はどうなると思いますか?個人的な予想は分岐後にt4がそれぞれ実行されると思ってますので、以下の出力結果を予想します。

t1 start
date
t1 end
t2 start
date
t2 end
t3 start
date
t3 end
t4 start
date
t4 end
t4 start
date 
t4 end

後ほど、答え合わせします。

□ DAGスクリプトのテスト

テストするにはpython ~/dags/tutorial_dev.pyを実行します。これでエラーが出なければスクリプト的なエラーがないことを確認することができます。試しにimportしてあるdatetimeを消してテストしてみます。

airflow@d6e6793bc451:~$ python dags/tutorial_dev.py 
Traceback (most recent call last):
  File "dags/tutorial_dev.py", line 11, in <module>
    'start_date': datetime(2018, 12, 31),
NameError: name 'datetime' is not defined

と言うふうに検出してくれます。スクリプト内容が実行されるわけではありません。

□ 外部トリガー実行

今回は実行のタイミングを制御したいので、DAGに定義してあるschedule_intervalはコメントアウトしています。これで外部トリガーでのみの実行が可能になります。何で実行するかどうかは後述します。

□ DAG GraphView

GraphViewをみてみると、意図したDAGになっていることがわかります。
スクリーンショット 2018-12-31 19.03.30.png

このGraphViewによるとt4の処理であるouput_finishは1回しか表示されていないので、先ほどの予想が外れている匂いがプンプンしてきましたね(笑)

□ DAGの実行・デプロイ

実行コマンドは以下になります。

airflow@d6e6793bc451:~$ airflow scheduler

これでhttp://(EC2(SGでポート8080解放済み)のIP):8080にアクセすると
スクリーンショット 2018-12-31 18.50.36.png

左にon/offのスイッチがでてきます。これでワークフローの実行,非実行の切り替えが行えます。また、この状態でスクリプトを変更すると即座に反映されるかと思います。

実行結果の検証

先ほど外部トリガーに設定したので、さっそく実行してみます。ちなみに外部トリガーで一番簡単なのがDAGの右にある再生ボタンになります。
スクリーンショット 2018-12-31 18.55.17.png
これを押してみると、
スクリーンショット 2018-12-31 18.58.24.png
DAG Runsがrunnnigになり、Recent Taskに4が表示されます。これは、DAGが4つのTaskで構成されているためです。

□ 最終的な表示

スクリーンショット 2018-12-31 19.37.50.png
DAGの結果がsuccessになり、 Recent Taskのsuccessに4が表示されています。無事にワークフローが実行されました。

□ 出力結果の確認

~/output/test.txtの内容が以下になりました。

output_st_start
Mon Dec 31 10:14:17 UTC 2018
output_st_end
output1start
Mon Dec 31 10:15:04 UTC 2018
output1end
output2start
Mon Dec 31 10:15:14 UTC 2018
output2end
output_fin_start
Mon Dec 31 10:15:24 UTC 2018
output_fin_end

output_finishの出力が1回しかされていないので、自分の予想は外れました(笑)ということは、データを2つに分けてそれぞれ異なる処理させたあと統合して処理するなどのワークフローも組めるという感じですね。

おわりに

Airflowを初めて触ってみましたが、日本語のドキュメントが意外に少なくて大変でした。ダッシュボードが充実していたり、スクリプトでの定義が簡単そうなので前評判通り使い勝手が良さそうです。

今回は最低限使えるレベルでの内容をまとめました。次回はもう少し凝ったETLをAirflowで管理・実行してみたいです。データベース接続やKubernetesとの連携などまだまだ触り足りないので!

その際に業務導入の検討や使い勝手の評価もしてみたいと思います。
Airflow入門の役にたてば嬉しいです。

参考

91
79
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
91
79