はじめに
せっかくの連休なので新しいこと勉強してみたいと思い、reInvent2018にて色々なところで名前が出てきた「Airflow」について触れていきます!
なかなかイケてるという噂なので、「Airflowとは何か」から「EC2での導入」「簡単な操作方法
」までこの記事ではまとめてみようと思います。
最近CI/CD周りに興味があるので勉強して業務に活かせるか検討したいと思います。
-----12/19 追記------
Airflow関連の記事も増えてきたのでリンクしておきます。
Airflowでモデルの学習からデプロイまでをやってみた
Airflowをここ3ヶ月触ってみた
Airflow - データパイプラインのスケジュールと監視をプログラムしてみた
What's Airflow ?
Airflow is a platform to programmatically author, schedule and monitor workflows.
Airflowはプログラムでワークフローを作成、スケジュール設定、監視するためのプラットフォームである(公式ドキュメントから引用)
ということで、ETL周りをGUIベースで実行・管理をサポートしてくれるツールのようです。
運用者としては、GUIベースでの設定やダッシュボードによる監視ができるのは嬉しい点ですね。また、DAG(Directed Acyclic Graph)のグラフ理論を用いたワークフロー構築がなされています。DAGは有向非巡回グラフと呼ばれ、ETLタスクを以下の図のようにDAGっぽくつなぎ合わせワークフローを形成するイメージです。
参考:有向非巡回グラフ(DAG)の意味とトポロジカルソート
歴史としては、Airbnb社で2014年に始まり、2015年にオープンソースとして公開後、2016年にApachからのサポート受け「Apach Airflow」となって今に至っているようです。
Github:https://github.com/apache/incubator-airflow
イケてるポイント
Airflowがイケてると思われるポイントをまとめてみました。この点を実際に使ったあとに評価していきたいと思います。
- 実行履歴の可視化(GUIで管理可能)
- 手動or自動によるワークフローのリトライ
- 並列処理
EC2へ導入
ホストへインストールを試みたところ、pip installではまってしまったため、dockerで環境を持ち込むことにしました。
以下のdockerリポジトリからpullしてきました。
https://hub.docker.com/r/puckel/docker-airflow
sudo docker pull puckel/docker-airflow
pullしてきたら、以下で立ち上げます。
sudo docker run -p 8080:8080 --name airflow puckel/docker-airflow
この状態でhttp://(EC2(SGでポート8080解放済み)のIP):8080(/admin/)にアクセスすると
airflowの管理者ページに飛ぶことができます。
また、立ち上げたコンテナに対して
sudo docker exec -it airflow /bin/bash
でコンテナに入り、airflow webserver
を実行しても同様にログが流れてairflowが起動します。
tutorial
ViewとCode
まず、tutorialをベースにどんな感じになっているかを調査していきたいと思います。
上記のdockerを立ち上げて、ブラウザから管理者ページにアクセスするとサンプル一覧が表示されます。
この一番下にtutorialというDAGがあります。
クリックすると、簡単なbash処理が3つ表示されます。
ここに表示されているのはtutorailというDAGの中のprint_date,sleep,templatedというtask_idがどのようなワークフローで組まれているかが表示されています。
では、このそれぞれのtask_idがどのように設定されているかをみてみます。Graph View
があるメニュー上のCode
をクリックしてみると、tutorialのスクリプトが表示されます。
# -*- coding: utf-8 -*-
from datetime import timedelta
import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': airflow.utils.dates.days_ago(2),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'dag': dag,
# 'adhoc':False,
# 'sla': timedelta(hours=2),
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function,
# 'on_success_callback': some_other_function,
# 'on_retry_callback': another_function,
# 'trigger_rule': u'all_success'
}
□ airflowにはいくつかOperatorが用意されている
Operatorは簡単に言うと、何で処理を実行するかという設定になります。このスクリプトではbashのみの処理のため、from airflow.operators.bash_operator import BashOperator
をインポートしています。
□ start_dateは必須の設定
start_dateはいつからこのDAGを実行するかという設定になります。ここで一つポイントなのが、過去に設定した場合、その設定した過去から処理が始まるという点です。
例えば、intervalの設定を12時間ごとにしてstart_dateを2日前にしてDAGを実行すると、2日前の12時,24時,1日前の12時,24時の分も実行されます。
dag = DAG(
'tutorial',
default_args=default_args,
description='A simple tutorial DAG',
schedule_interval=timedelta(days=1),
)
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag,
)
t2 = BashOperator(
task_id='sleep',
depends_on_past=False,
bash_command='sleep 5',
dag=dag,
)
templated_command = """
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{% endfor %}
"""
t3 = BashOperator(
task_id='templated',
depends_on_past=False,
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'},
dag=dag,
)
t1 >> [t2, t3]
□ DAGで大枠を設定
第一引数がDAG名になります。GUIで表示されます。このDAGの説明をdescriptionで設定できたりします。DAGで一番重要なのがschedule_intervalです。このスクリプトのようにtimedelta(days=1)
で1日ごとの設定ができたり 、cronのように* * * * * command
でも設定できます。
□ 細かな処理を定義
今回は3つ定義されています。先ほどのViewで表示されていたprint_date,sleep,templatedがここのtask_idに設定されています。
□ t1 >> [t2, t3]
最後にワークフローを構築します。このスクリプトように直感的にもかけますし、他の書き方あるようです。
参考:https://airflow.incubator.apache.org/tutorial.html
ここまでで、管理者ページに表示されていたGraphViewと実際の処理スクリプトとの関係がわかったかと思います。
ただ、このtutorialを実行しても結果がわかりにくいので処理結果がわかりやすいようなDAGを構築してみます。
tutorial_dev
tutorialをベースにした検証用DAG:tutorial_devを構築します。
まず、airflowがどこのスクリプト参照しているかを確認します。
上記のコンテナにはいると、airflow.cfgがあります。このファイルにairflowの設定がまとまっています。
[core]
# The home folder for airflow, default is ~/airflow
airflow_home = /usr/local/airflow
# The folder where your airflow pipelines live, most likely a
# subfolder in a code repository
# This path must be absolute
dags_folder = /usr/local/airflow/dags
# The folder where airflow should store its log files
# This path must be absolute
base_log_folder = /usr/local/airflow/logs
上記のdags_folder
がairflowが参照しているDAGのフォルダーになります。しかし、普通にコンテナに入るとこのフォルダがないのでdagsを作ります。
dags直下に以下のようなtutorial_dev.pyを作ります。
# -*- coding: utf-8 -*-
from datetime import datetime, timedelta
import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 12, 31),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
}
dag = DAG(
'tutorial_dev',
default_args=default_args,
catchup=False,
# schedule_interval=timedelta(minutes=3),
)
bash_command = """
echo output_st_start >> ~/output/test.txt
date >> ~/output/test.txt
echo output_st_end >> ~/output/test.txt
"""
t1 = BashOperator(
task_id='output_start',
bash_command=bash_command,
dag=dag,
)
bash_command = """
echo output1start >> ~/output/test.txt
date >> ~/output/test.txt
echo output1end >> ~/output/test.txt
sleep 3
"""
t2 = BashOperator(
task_id='output1',
depends_on_past=False,
bash_command=bash_command,
dag=dag,
)
bash_command = """
echo output2start >> ~/output/test.txt
date >> ~/output/test.txt
echo output2end >> ~/output/test.txt
sleep 3
"""
t3 = BashOperator(
task_id='output2',
depends_on_past=False,
bash_command=bash_command,
dag=dag,
)
bash_command = """
echo output_fin_start >> ~/output/test.txt
date >> ~/output/test.txt
echo output_fin_end >> ~/output/test.txt
"""
t4 = BashOperator(
task_id='output_finish',
depends_on_past=False,
bash_command=bash_command,
dag=dag,
)
t1 >> [t2, t3] >> t4
□ 簡単な仕様説明
ワークフローが意図した順序で実行されているかを確認したいため、taskとしてdate >> ~/output/test.txtを定義します。
ここで1つ検証要素を追加します。今回、構築したワークフローt1 >> [t2, t3] >> t4ですがt2とt3に分岐したあとt4はどうなると思いますか?個人的な予想は分岐後にt4がそれぞれ実行されると思ってますので、以下の出力結果を予想します。
t1 start
date
t1 end
t2 start
date
t2 end
t3 start
date
t3 end
t4 start
date
t4 end
t4 start
date
t4 end
後ほど、答え合わせします。
□ DAGスクリプトのテスト
テストするにはpython ~/dags/tutorial_dev.py
を実行します。これでエラーが出なければスクリプト的なエラーがないことを確認することができます。試しにimportしてあるdatetimeを消してテストしてみます。
airflow@d6e6793bc451:~$ python dags/tutorial_dev.py
Traceback (most recent call last):
File "dags/tutorial_dev.py", line 11, in <module>
'start_date': datetime(2018, 12, 31),
NameError: name 'datetime' is not defined
と言うふうに検出してくれます。スクリプト内容が実行されるわけではありません。
□ 外部トリガー実行
今回は実行のタイミングを制御したいので、DAGに定義してあるschedule_interval
はコメントアウトしています。これで外部トリガーでのみの実行が可能になります。何で実行するかどうかは後述します。
□ DAG GraphView
GraphViewをみてみると、意図したDAGになっていることがわかります。
このGraphViewによるとt4の処理であるouput_finishは1回しか表示されていないので、先ほどの予想が外れている匂いがプンプンしてきましたね(笑)
□ DAGの実行・デプロイ
実行コマンドは以下になります。
airflow@d6e6793bc451:~$ airflow scheduler
これでhttp://(EC2(SGでポート8080解放済み)のIP):8080にアクセすると
左にon/offのスイッチがでてきます。これでワークフローの実行,非実行の切り替えが行えます。また、この状態でスクリプトを変更すると即座に反映されるかと思います。
実行結果の検証
先ほど外部トリガーに設定したので、さっそく実行してみます。ちなみに外部トリガーで一番簡単なのがDAGの右にある再生ボタンになります。
これを押してみると、
DAG Runsがrunnnigになり、Recent Taskに4が表示されます。これは、DAGが4つのTaskで構成されているためです。
□ 最終的な表示
DAGの結果がsuccessになり、 Recent Taskのsuccessに4が表示されています。無事にワークフローが実行されました。□ 出力結果の確認
~/output/test.txtの内容が以下になりました。
output_st_start
Mon Dec 31 10:14:17 UTC 2018
output_st_end
output1start
Mon Dec 31 10:15:04 UTC 2018
output1end
output2start
Mon Dec 31 10:15:14 UTC 2018
output2end
output_fin_start
Mon Dec 31 10:15:24 UTC 2018
output_fin_end
output_finishの出力が1回しかされていないので、自分の予想は外れました(笑)ということは、データを2つに分けてそれぞれ異なる処理させたあと統合して処理するなどのワークフローも組めるという感じですね。
おわりに
Airflowを初めて触ってみましたが、日本語のドキュメントが意外に少なくて大変でした。ダッシュボードが充実していたり、スクリプトでの定義が簡単そうなので前評判通り使い勝手が良さそうです。
今回は最低限使えるレベルでの内容をまとめました。次回はもう少し凝ったETLをAirflowで管理・実行してみたいです。データベース接続やKubernetesとの連携などまだまだ触り足りないので!
その際に業務導入の検討や使い勝手の評価もしてみたいと思います。
Airflow入門の役にたてば嬉しいです。