LoginSignup
147
138

More than 5 years have passed since last update.

Airflow - データパイプラインのスケジュールと監視をプログラムしてみた

Last updated at Posted at 2015-11-08

Airflowを導入するとcronのバッチ処理でエラーが起きてログファイルを漁った結果、Log出力が甘くて原因特定できないぐぬぬぬぬもうやだまじつらい、みたいなことが仕組みで防げるようになります。

Airflowってご存知でしょうか?

Airbnb社がオープンソースで公開しているデータパイプラインのスケジュールとモニタリングツールです。簡単に言うとジョブツリーを構築できる高機能なcron。Python2系で開発されていてpip install可能なオープンソースのソフトウェアです。AWSが年1で開催している大規模イベントre:Invent 2015で複数の会社がAirflowを利用していると発表し注目を集めました。Yahooの発表を読んで興味を持ちました。この記事はAirflowをプロジェクトに導入すべきか検討、検証したメモです。

■ プロジェクトの解析タスクをAirflowに乗せてみた
スクリーンショット 2015-11-09 4.06.06.png

Airflowはデータパイプラインのスケジュールと監視をプログラムできるシステム

日本語資料が少なく、最初Airflowで一体何が出来るか判りませんでした。インストール方法は後回しにして、まず使い道に関する情報を補完したいと思います。1週間ほど検証したあと振り返ってみるとAirflowリポジトリの最初の1行目に書いてあるdescriptionがAirflowのことを一番適切に表しているなと思います。

Airflow is a system to programmatically author, schedule and monitor data pipelines.(超訳:Airflowはプログラムすることで次の機能を提供するシステムです。例:データパイプラインのスケジュール、監視など)

Airflowをスケジュールと監視以外の用途に利用すると、例えばデータの操作コマンドをゴリゴリ書いたり、手動でタスクを実行する用途に利用すると途端に使い辛いシステムになるので導入前の利用用途の切り分けが重要です。

具体例で学ぶ: DBからGoogleBigQueryにデータを投入する処理

解析系のタスクをAirflowで書いてみました。S3に一度データを置いているのは、途中で処理が失敗したときにDBから再取得すると取得時間がズレる対策と、dumpコマンドを1日に何度も叩きたくないからです。

■ 仕様
1日に1度MySQLから必要なデータをdumpしてS3に保存したあと、GoogleCloudStorageにコピーしてBigQueryにデータを投入する。GoogleCloudStorageにデータが設置された段階で関係者にメールを送信する。

■ 仕様をタスクに分解
1. AWS Data Pipeline を使用した MySQL データの Amazon S3 へのエクスポート
2. S3からGoogleCloudStorageにデータコピー
3. GoogleCloudStorageからBigQueryにデータ投入
4. メール送信タスクの実行

■ Airflowの設計上やってはいけないこと
jenkinsでも推奨されているように、ジョブの詳細の定義は各シェルやコマンドに集約すべきです。Airflow側でゴリゴリビジネスロジックを書いてしまうと、更新管理や差分の記録反映が難しくなります。Airflow側でプログラムすべき処理はフローとスケジュールに集中すべきです。

■ Airflowのタグ実装例
仕様をAirflowでプログラムしていきます。
export_db.pyにロジックを書く(日本語のコメントを1文字でも書くとAirflowのタスクとして認識されなくなる実装はまじで糞だと思いました。)

export_db.py
# -*- coding: utf-8 -*-
from __future__ import absolute_import, unicode_literals
import os
from airflow.operators import PythonOperator
from airflow.models import DAG
from datetime import datetime, timedelta
seven_days_ago = datetime.combine(datetime.today() - timedelta(7),
                                  datetime.min.time())

args = {
    'owner': 'airflow',
    'start_date': seven_days_ago,
}

dag = DAG(dag_id='export_db', default_args=args)

CMD_BASE_DIR = '~/analyze/{}'

# cmd file name
EXPORT_DB_TO_S3_CMD = 'export_db_to_s3.sh'
COPY_S3_TO_GOOGLE_CLOUD_STORAGE_CMD = 'copy_s3_to_google_storage.py'
IMPORT_BIG_QUERY_CMD = 'import_big_query.py'
SEND_MAIL_CMD = 'send_mail.py'


def do_cmd(cmd):
    os.system(cmd)

# define task
# 1. db to s3
task1 = PythonOperator(
    task_id='1.' + EXPORT_DB_TO_S3_CMD,
    python_callable=do_cmd,
    provide_context=True,
    op_kwargs={'cmd': CMD_BASE_DIR.format(EXPORT_DB_TO_S3_CMD)},
    dag=dag)

# 2. s3 to cloud storage
task2 = PythonOperator(
    task_id='2.' + COPY_S3_TO_GOOGLE_CLOUD_STORAGE_CMD,
    python_callable=do_cmd,
    op_kwargs={'cmd': CMD_BASE_DIR.format(COPY_S3_TO_GOOGLE_CLOUD_STORAGE_CMD)},
    dag=dag)

# 3. import bq
task3 = PythonOperator(
    task_id='3.' + IMPORT_BIG_QUERY_CMD,
    python_callable=do_cmd,
    op_kwargs={'cmd': CMD_BASE_DIR.format(IMPORT_BIG_QUERY_CMD)},
    dag=dag)

# 4. send mail
task4 = PythonOperator(
    task_id='4.' + SEND_MAIL_CMD,
    python_callable=do_cmd,
    op_kwargs={'cmd': CMD_BASE_DIR.format(SEND_MAIL_CMD)},
    dag=dag)

# define task stream
# 1 >> 2 >> 3 and 2 >> 4
task1.set_downstream(task2)
task2.set_downstream(task3)
task2.set_downstream(task4)

# define start task
run_this = task1

■ タスクがちゃんと動くかテストする
python ~/airflow/dags/export_db.py

■ Airflowを再起動してタスクを反映
タスク反映にAirflow再起動が必要な点は本当に糞だと思います。このデメリットもあってタスクにビジネスロジックを書くことは推奨できません。

■ タスクが登録されていることをブラウザで確認
デフォルトだとトップページのhttp://localhost:8080/admin/にexport_dbが登録されている。

■ スケジュール実行
pythonで定義したスケジュール通りに動かすためにはairflow schedulerで動き出します。今回の場合は開始日時が7日前と設定されているので、延々と無限ループで動き続けます。

■ 画像1. 規定したタスクのグラフビュー
スクリーンショット 2015-11-09 4.43.41.png

■ 画像2. トップのDAG一覧
スクリーンショット 2015-11-09 4.15.18.png

■ 画像3. タスクのツリービュー
スクリーンショット 2015-11-09 4.17.32.png

cronと比べてAirflowのここが便利だと思った

cronでの運用と比較したときの導入メリットです。主観多めです。

■ 1. タスク毎の実行時間が可視化される
実行時間のサマリーを取って、綺麗なWebビューでグラフ表示される点は素晴らしいと思います。cronでバッチ実行してログに吐いているとサマリー取るのも一苦労です。

■ 2. エラーログがとにかく見やすい
何時何分に実行されたタスクでどんなエラーが発生して、どんな標準エラー出力したかをWebから見れます。cronでローテーションすらされてないログファイルに吐き出し続ける実装とは雲泥の差があります。エラーが起きてログファイルを漁り、結果Log出力が甘くて原因特定できないぐぬぬぬぬ、もうやだまじつらい、みたいなことが仕組みで防げるのは素晴らしいと思います。

■ 3. ジョブツリーを構成できる
Aタスクが終わってからBタスク実行と明確に定義できます。

■ 4. ツリー変更や実行時間変更をgitの記録に残せる
スケジュールやツリーをpythonでプログラムすることになるのでgitで管理すれば変更履歴が残ります。

jenkinsとAirflowは用途が異なる

Airflowでは手動でタスク実行が出来ません。jenkinsとは製品として目指す方向が違うのでAirflowではあえて実装していないのだと思います。(AirflowではCeleryExecutorを導入することでタスクの手動実行が可能でした訂正致します。なぜCeleryExecutorが必要かはこちらのissues参照)Airflowではタグ名の設定すら、すべてpythonコマンド内で定義します。WebGUI上からは実行状況をモニターするのみであり、一切タスクの挙動を変更することができません。こちらもあえてそうしてるんじゃいなかなーと思います。Airflowは尖っている製品なので、この辺りを見誤ってjenkinsの代替製品として運用しようとすると、なにこれ使えないとなる可能性がありそうです。

まとめ:Airflowが向いている用途

完全に自動化されているタスクで、普段は存在を意識しておらず月1で障害が起きたときにだけタスクのどこでエラーが起きたかみたい。といった用途に向いていると思いました。

導入方法

mysqlが立ち上がっている自分のローカル環境だと10分でインストールして起動してブラウザで動作確認することができました。

install

公式のReadme.rstファイルを読みながらインストールしました。

mkvirtualenv airflow
mkdir ~/airflow
cd ~/airflow/
pip install airflow[mysql]
export AIRFLOW_HOME=~/airflow
airflow initdb

run

airflow webserver -p 8080

疎通確認

ブラウザでhttp://localhost:8080/にアクセス

最初のタスクを定義する

mkdir ~/airflow/dags
touch ~/airflow/dags/__init__.py
touch ~/airflow/dags/export_db.py
# export_db.pyにタスク定義を書く

タスクのテスト

python ~/airflow/dags/export_db.py

一覧

airflow list_dags
airflow list_tasks export_db
airflow list_tasks export_db --tree

スケジュール実行

airflow scheduler

147
138
2

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
147
138