Edited at

Luigi によるワークフロー管理

More than 1 year has passed since last update.

Workflow Hacks! #1 が開催されるなど、データ分析のワークフロー管理が見直されつつある今日この頃ですが、Treasure Data 社内ではこれまでのところ Luigi を主なツールとして利用しています。ここでは Luigi を使ったワークフローがどのようなものであるかを紹介します。

(追記: 2016-09-18)その後、社内ワークフローは Digdag(主にSQLのクエリ実行に利用)、及び Airflow(主にスクリプトの実行に利用)へと置き換わり、Luigi を利用することはほぼなくなりました。


ワークフロー管理ツールとは?

データ分析における「ワークフロー管理ツール」とは、データ処理の過程で必要となる一連のタスク(データのロードや、クエリの実行など)を自動化し、管理するために設計されたソフトウェアです。例えば、毎日決まったタスクをスケジュール実行したり、問題が起きた場合に通知するような目的で利用されます。

簡単なタスクなら専用のツールを用いるまでもなく、シェルスクリプトやスクリプト言語などで記述すればいいのですが、何百ものタスクを数時間かけて実行するようなケースだと、単純なスクリプトでは管理するのも難しくなってきます。特にビッグデータを扱う場合、データ処理を小さなタスクに分割して並列実行したり、予期せぬ障害からのリカバリーについて考えねばならないことも多く、そうした作業を少しでも楽にするために様々なツールが開発されています。

Treasure Data 社内では、サービス全体の稼動状況の把握や、顧客別の利用量を確認するために、BIツールを用いたダッシュボードを作成しています。その元となるデータは様々な場所から集めてくる必要があり、そのタスクを制御する中心的な役割を果たすのがワークフロー管理ツールです。

ワークフロー管理ツールは、それ自体でデータの「抽出」「変換」「格納」という、いわゆる「ETL処理」を行なう場合もありますが、最近はむしろ外部のサービスを API でコントロールすることが多くなってきています。Treasure Data もサービスとして様々なアウトプットに対応していることから、ワークフロー管理ツールの役割は API によってその制御を行なうことが中心となります。


Luigi の特徴

Luigi は Python で実装されたワークフロー管理ツールの一種で、類似のソフトウェアの中では比較的単純なシステムです。例えば、次のような機能が提供されています。


  • タスク間の依存関係を定義し、それらを順に実行する

  • 複数のタスクを同時並列で実行する

  • タスクが失敗したときに自動的にリトライを行う

  • タスクのステータスを管理し、未完了のタスクのみを実行する

一方で、タスクのスケジュール実行のような基本的な機能がなく、足りないものは自分で補う必要があります。機能不足を感じることもありますが、コマンド一つで手軽に使えるという気楽さもあり、自分にとっては欠かせないツールの一つになっています。

Luigi は Python スクリプトに埋め込む形で、ライブラリとして利用します。例えば、"Hello, world!" と表示するタスクであれば、次のように記述します。

# tasks.py

import luigi

class HelloWorld(luigi.Task):
def run(self):
print('Hello, world!')

これを次のような感じでコマンドラインから実行します。

$ luigi --local-scheduler --module tasks HelloWorld

DEBUG: Checking if HelloWorld() is complete
/usr/local/lib/python2.7/site-packages/luigi/worker.py:261: UserWarning: Task HelloWorld() without outputs has no custom complete() method
is_complete = task.complete()
INFO: Informed scheduler that task HelloWorld() has status PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 22594] Worker Worker(salt=010233409, workers=1, host=Keisuke-MacBook.local, username=knishida, pid=22594) running HelloWorld()
Hello, world!
INFO: [pid 22594] Worker Worker(salt=010233409, workers=1, host=Keisuke-MacBook.local, username=knishida, pid=22594) done HelloWorld()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task HelloWorld() has status DONE
DEBUG: Asking scheduler for work...
INFO: Done
INFO: There are no more tasks to run at this time
INFO: Worker Worker(salt=010233409, workers=1, host=Keisuke-MacBook.local, username=knishida, pid=22594) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 ran successfully:
- 1 HelloWorld()

This progress looks :) because there were no failed tasks or missing external dependencies

===== Luigi Execution Summary =====


S3 ファイル一覧の取得

より実践的な例として、S3 に格納されたファイル一覧を作成することを考えましょう。例えば、過去3年間に渡って一億ほどのファイルが保存されているとします。S3 にはファイル一覧を取得する API がありますが、一度に取れる数は1000個までなので、少なくとも10万回以上 API を呼ばなければなりません。仮に一回の API 呼び出しが0.1秒で終わるとしても、100000 * 0.1 / 3600 = 2.7時間くらい掛かります。

もしこれを単純な Python スクリプトで回すとすれば、こんな感じになるでしょうか。

import boto

# S3 バケットに接続
s3 = boto.connect_s3()
bucket = s3.get_bucket('my-bucket')

# ファイル名とサイズの一覧を作成
with open('s3-files.csv', 'w') as f:
for key in bucket.list():
f.write("{},{}\n".format(key.name, key.size))

スクリプトを実行して、数時間で無事に終わればラッキーですが、実際には予期せぬエラーで失敗することもよくあります。例えば、ネットワークエラーや、ディスク溢れ、文字コードの変換エラーなどはよく目にします。エラーにはならなくとも、数時間たっても終わらなくて、いつ終わるのか見当もつかないことさえあるでしょう。そのような長時間のタスクを、一回のスクリプト処理で済ませようとするのは賢いやり方ではないので、タスクの分割を考えます。

予備知識として、ファイルのパスには次のように日付が含まれているとします。

2010-01-01/web1-access-2010-01-01.log.gz

2010-01-01/...
2010-01-02/web1-access-2010-01-01.log.gz
2010-01-02/...

過去3年間 = 1095日分の日付があるので、一日ごとに集計を行なうようにタスクを分割しましょう。次のスクリプトで s3_files_task() は日付を含んだ文字列を受け取り、その日の集計を行なうタスクです。これを必要な日数だけ繰り返し実行することで、適度に小さなタスクに分割できそうです。

# 指定した日付の集計を行なうタスク

def s3_files_task(prefix):
output_file = 's3-files-{}.csv'.format(prefix.replace('/', '_'))
with open(output_file, 'w') as f:
for key in bucket.list(prefix=prefix):
f.write("{},{}\n".format(key.name, key.size))

# タスクを繰り返し実行する
for prefix in bucket.list(delimiter='/'):
s3_files_task(prefix.name)

一日ごとに出力ファイルが得られるので、もし途中でエラーになったとしても、既に終わった部分はスキップして、途中から再開するのも簡単になるでしょう。


Luigi によるタスクの定義

先程のスクリプトを Luigi で書き換えると、次のようになります。Luigi では output() メソッドによって、タスクが生成するファイルを明記します。指定したファイルが既に存在すればそのタスクはスキップされ、存在しなければ run() メソッドが実行されます。つまり Luigi とは、タスクが失敗する可能性をあらかじめ考慮の上で、まだ終わっていないタスクだけを選択的に実行するためのフレームワークです。

import luigi

# 指定した日付の集計を行なうタスク
class S3FilesTask(luigi.Task):
prefix = luigi.Parameter()

def output(self):
# ここで指定したファイルが存在しなければ run() が実行される
return luigi.LocalTarget('s3-files-{}.csv'.format(self.prefix.replace('/', '_')))

def run(self):
with self.output().open('w') as f:
for key in bucket.list(prefix=prefix):
f.write("{},{}\n".format(key.name, key.size))

class MainTask(luigi.WrapperTask):
def requires(self):
# タスクを繰り返し生成する
for prefix in bucket.list(delimiter='/'):
yield S3FilesTask(prefix.name)

タスクに結び付くもう一つのメソッドとして requires() があります。これはそのタスクの実行前に完了すべき別のタスク、つまり依存関係を定義します。上の例では、MainTask は多数の S3FilesTask に依存しており、その全てが完了することを要求します。

Luigi スクリプトを実行するには、次のようにトップレベルのタスクを呼び出します。そうすると自動的に依存関係が解決されて、未完了のタスクが全て実行されます。途中でエラーが起きたとしても、もう一度同じコマンドを実行すれば、まだ終わっていないタスクを Luigi が見つけてくれます。

$ luigi --local-scheduler --module tasks MainTask

一般的に、Luigi のタスクは次のように定義されます。少し特殊なタスクとして、requires() だけを持つ luigi.WrapperTask や、output() だけを持つ luigi.ExternalTask がありますが、まずは次の基本系だけ覚えれば十分です。

class MyTask(luigi.Task):

# パラメータを定義する
my_param = luigi.Parameter()

def requires(self):
# 依存するタスクを定義する
return AnotherTask()

def output(self):
# 出力するファイルを定義する
return luigi.LocalTarget('...')

def run(self):
# タスクを実行して結果を保存する
with self.output().open('w') as f:
f.write(...)

実際のところ、Luigi を用いたワークフローとは、この luigi.Task を用いたプログラミングであり、これを使って何をするかはユーザに任されています。最初から用意されている機能はそれほど多くありませんが、自分で run() メソッドを実装することで、あらゆる外部サービスを制御できるのが Luigi の利点です。


タスクの並列実行

タスクが増えてくると、スピードアップのために並列実行したくなります。どのくらいの並列処理が許されるかは、利用するバックエンドシステムによって異なります。例えば、Treasure Data の Hive であれば、同時に 10 や 20 のクエリを実行しても問題ありませんが、Presto の場合は同時実行が 1〜5 程度(プランによる)に抑えられているため、あまり並列度を上げても意味がありません。

Luigi でタスクの並列度を指定するには、--workers オプションを利用します。次の例では、最大 4 プロセスを利用した並列処理が行なわれます。

$ luigi --local-scheduler --workers 4 --module tasks MainTask

以前、Luigi スクリプトで S3 上の数億ファイルのサイズを調べたことがあります。再帰的に生成されたタスク数は3万ほどで、16コアのマシンでシステム負荷やエラー発生率を見ながらプロセス数を調整し、最終的に 50 プロセスほどで数時間かかった覚えがあります。途中で何度もエラーが発生しましたが、Luigi のお陰で全てのタスクを最後まで実行することが出来ました。このように大量の API を呼び出すタスクを管理するところで、Luigi は大いに役立ってくれます。

一方で、Luigi にはノードを越えてタスクを分散させる機能はなく、多数のマシンを使った高度な分散処理には向いていません。Luigi の主な役割はそれ自体でデータ処理を行なうことではなく、外部の分散システムに命令を出すための司令塔として用います。


データのパイプライン処理

一つのタスクの結果を次のタスクで使いたいことがよくあります。例えば、外部ストレージから取り出してきたデータをスクリプトで加工し、それを Treasure Data にインポートしたいと考えるなら、少なくとも2つか3つのタスクに分割するといいでしょう。


  1. 外部ストレージからのデータ抽出

  2. 抽出したデータの加工

  3. 加工したデータのアップロード

このタスク分割は、特に開発を行なっているときに効果的です。2 のデータ加工は、何度も試行錯誤しながら繰り返し実行するので、その度にデータを抽出してくるのは明らかに無駄です。1 と 2 の処理はタスクを分割し、繰り返し 1 が実行されることのないようにします。Luigi では、これを次のように実装します。

class Task1(luigi.Task):

def output(self):
# 出力するファイルを定義する
return luigi.LocalTarget('...')

def run(self):
# 外部ストレージからデータを抽出する
data = ...
# 抽出したデータを保存する
with self.output().open('w') as f:
f.write(data)

class Task2(luigi.Task):
def requires(self):
# Task1 を実行する
return Task1()

def output(self):
# 出力するファイルを定義する
return luigi.LocalTarget('...')

def run(self):
# Task1 のファイルを読み込む
with self.input().open() as f:
data = f.read()
# データを加工する
data = ...
# 加工済みのデータを保存する
with self.output().open('w') as f:
f.write(data)

Task2 に含まれる self.input() は、前のタスクの self.output() と等価になります。これらを用いて、タスクからタスクへとデータを受け渡していくのが、Luigi におけるデータパイプラインの基本的な記述方法です。

上の例のように LocalTarget を用いるのは、比較的小さなデータをローカルファイルとして扱う場合に限られます。そのようなケースでは、自分はよく Pandas で中間処理を行ないます。例えば、外部のデータベースから取り出したデータを加工してレポートを作るなら、次のような感じになります。

import pandas as pd

class ExtractTask(luigi.Task):
def output(self):
return luigi.LocalTarget('data.csv')

def run(self):
# SQL を実行し、DataFrame として読み込む
query = '''
SELECT ...
FROM some_table
WHERE ...
'''

df = pd.read_sql(query, ...)
# DataFrame を CSV ファイルとして保存する
with self.output().open('w') as f:
df.to_csv(f, index=False)

class ReportTask(luigi.Task):
def requires(self):
return ExtractTask()

def output(self):
return luigi.LocalTarget('report.csv')

def run(self):
with self.input().open() as f:
df = pd.read_csv(f)
# データを加工して保存
df = ...
with self.output().open('w') as f:
df.to_csv(f, index=False)


タスクの再実行

既に完了したタスクを、もう一度やり直したいことがよくあります。Luigi はタスクの完了を output() の存在によって確認しているので、生成されたファイルをただ削除すればタスクを再実行できます。

ファイルのあるなしで再実行を制御するのは、ある意味シンプルでわかりやすい方法ですが、慣れないと混乱を招くかもしれません。特に、UI からタスクを再実行する、といった制御が一切行なえないので、複数人で安定運用するフェーズでは扱い辛いと感じるかもしれません。

output() で指定するファイル名は、タスクの再実行や、不要になったファイルを削除するときのことを考えて、一定の命名規則を定めるとよいでしょう。自分の場合、ほとんどのタスクは次のようなルールでファイルを生成しています。

class MyTask1(luigi.Task):

# ほとんどのタスクは日時を最初のパラメータとして受け取る
date = luigi.DateParameter()

def output(self):
# 生成するファイル名: /tmp/luigi/YYYY-MM-DD/app.tasks/MyTask1(date=2015-12-06)
output_path = os.path.join(
'/tmp/luigi', # 作業ディレクトリ
self.date.strftime('%Y-%m-%d'), # 日付
__name__, # モジュール名
str(self) # タスク名
)
return luigi.LocalTarget(output_path)

ここで注意点として、システムの障害等でファイルが消えてしまうと、意図せずタスクが再実行されてしまう可能性があることには気を付ける必要があります。再実行されると困るようなタスクは LocalTarget を使わずに、HdfsTargetS3Target のような永続的なストレージを output() に指定すべきです。


リトライ制御とエラー通知

長時間に及ぶワークフローを実行する場合には、途中でタスクが失敗しても、成功するまで何度かリトライしてほしいところです。しかし、--local-scheduler オプションを用いたスタンドアロンの実行では、自動的なリトライは行なわれないまま終了してしまいます。リトライを制御するためには、別途 luigid サーバを起動しておく必要があります。

http://luigi.readthedocs.org/en/stable/central_scheduler.html

スクリプトの実行時には --local-scheduler を外します。これでタスクのスケジューリングが luigid によって集中管理され、複数の Luigi スクリプトを同時に実行しても同じタスクが重複するようなことがなくなります。実際、プロダクション環境では常に luigid を利用することが推奨されています。

デフォルトだと、エラーが発生してから15分後にリトライが行なわれ、それでも駄目だとタスクが失敗します。これをカスタマイズするには、スクリプト実行時に設定ファイル(luigi.cfg)を与えます。次の例では、5分間隔で10回のリトライを行ないます。

[core]

max-reschedules: 10
worker-keep-alive: true
retry-external-tasks: true

[scheduler]
retry-delay: 300

エラーが発生したときに通知を受け取るには、次のようなメールの配送設定が必要です。この例では Amazon SES を用いてメールを送ります。

[core]

error-email: luigi-owner@example.com
email-sender: luigi@example.com
email-prefix: [luigi]

[email]
type: ses
AWS_ACCESS_KEY: AKIA...
AWS_SECRET_KEY: VMoD...

詳細な設定についてはドキュメントを参照して下さい。


リカバリ可能なワークフロー

管理するタスクが増えるにつれて、日々様々なエラーを目にするようになります。あるべきはずのデータがない。いつもは繋がるサーバに繋がらない。それまで動いていたクエリがエラーになる、などなど。(いつもご不便をお掛けしております・・)

エラーに強いワークフローを作るには、エラー通知を受け取ってから何かするのではなく、エラーから自動回復するようなリカバリ可能なタスクの実装を心掛けることが重要です。

もっとも単純な方法は、欲しいデータは毎回作り直す、つまり "replace" モードで運用することです。例えば、過去30日間のサービス利用状況を毎日レポートしたいとします。仮に一日一回のバッチ処理で、前日分の集計結果だけを書き出していると、それがエラーになった場合にやり直さなければいけません。しかし、毎日30日分のデータを集計し直し、レポートを置き換えるようにしておけば、その日のエラーは放置しても、翌日にはまた正しい結果が得られるでしょう。いずれ回復することがわかっているなら、必ずしも全てのエラーに対処する必要はなくなり、余裕を持ってワークフローを運用出来るようになります。

とはいえ、毎回やり直すのは難しいタスクもあります。一つは性能上の理由で、全期間を集計するには時間が掛かりすぎる場合。もう一つはデータのインポートなど、本質的に差分だけを扱う必要がある場合。こうしたタスクは、既存のデータに対する追記、つまり "append" モードで運用することになります。しかし、"append" モードは、エラーが発生するとデータが欠損し、繰り返し実行するとデータが重複するため、確実に一回だけ実行されるよう気を付けねばなりません。

このうち前者の欠損問題を解決するには、まだ終わっていないタスクだけを実行するという Luigi のタスク管理が役立ちます。特に、毎日(あるいは毎時)実行するような定期的なタスクでは、後述する Range 実行が便利です。(ちなみに後者の重複問題を回避するには、タスクを「べき等」に実装する方法がありますが、話が長くなるので割愛。)


Range 実行とバックフィル

毎日一回、必ず実行したいようなタスクでは、次のように RangeDailyBase を利用するのがお勧めです。

$ luigi --module tasks RangeDailyBase --of MyTask --start 2015-01-01

こうすると、MyTask は指定した日(ここでは 2015-01-01)から現在までの全ての日をパラメータとして次々と実行されます。一度タスクが成功すれば、その日のタスクは(ファイルを消さない限りは)二度と実行されないので、確実に一回だけ実行したい場合に役立ちます。タスクが失敗した場合でも、翌日に同じコマンドを実行すれば、失敗した日の分まで再度実行してくれます。

--start 以降に生成されたファイルは消さずに残すようにしておかないと、タスクが再実行されてしまうので注意が必要です。開始日の代わりに、「過去30日」のような相対的な期間で実行したければ、次のように指定する方法もあるようです。

$ luigi --module tasks RangeDailyBase --of MyTask --days-back 30 --reverse

この辺りの話は、次のページに詳しく記述されいます。

http://luigi.readthedocs.org/en/stable/luigi_patterns.html


スケジュール実行

Luigi にはタスクをスケジュール実行する機能がなく、cron 等で呼び出してやる必要があります。実際、Luigi は cron を想定して設計されており、これといった不都合はなさそうです。

自分の場合、最初に Luigi の実行環境を整えるための簡単なラッパースクリプトを用意しています。

#!/bin/bash

# 作業ディレクトリに移動
cd $HOME/workflow
# Luigi スクリプトのパスを設定
export PYTHONPATH=$HOME/workflow
# 各種の環境変数を .envrc というファイルからロード
source .envrc
# Python の環境設定(minicondaを利用)
source $HOME/miniconda/bin/activate workflow
# コマンドを実行
exec $*

そしてスケジュール実行したい内容を適当なファイルに記述。

# luigi.crontab

LUIGI="/home/luigi/wrapper.sh luigi"

# field allowed values
# ----- --------------
# minute 0-59
# hour 0-23
# day of month 1-31
# month 1-12 (or names, see below)
# day of week 0-7 (0 or 7 is Sun, or use names)

00 00 * * * $LUIGI --workers 2 --module app.s3_tasks S3DailyTasks
01 00 * * * $LUIGI --workers 4 --module app.td_tasks RangeDailyBase --of TDDailyTasks --start 2015-01-01
...

これを一般ユーザの crontab コマンドで登録すれば、スケジュール実行開始。

$ crontab luigi.crontab

スケジュールを変更するには、ファイルを編集して crontab で再登録。あるいは一時的に変更するだけなら crontab -e で編集。登録内容の確認は crontab -l、設定の削除は crontab -r

cron によるスクリプト実行そのものに失敗すると、当然ながら Luigi によるエラー通知は行なわれず、syslog へのエラーログとなるので注意が必要です。


Treasure Data のジョブを実行する

ここまで Luigi の一般的な使い方を説明しましたが、もちろん Treasure Data のジョブを Luigi から実行することも可能です。クエリの実行をタスクとして定義するには luigi-td というパッケージも使えますが、中間テーブルの作成や partial_delete などの実行まで考えると、結局のところ素の Luigi から tdclient を直接使う方がわかりやすいかもしれません。

この辺りはまだ発展途上であるため、いずれまた改めて取り上げたいと思います。


まとめ

以上のように、Luigi はどちらかと言うとプログラマ向けのワークフロー管理ツール(というかライブラリ)です。シンプルゆえに、その気になれば何でも実現できますが、ある程度の Python スキルとシステム管理スキルがなければ扱うのは難しいかもしれません。

一方で、データ分析のための言語として Python はますます人気が高まっており、データ処理とワークフロー管理の両方をまとめて記述できる Luigi は、今後も一定の人気を集めそうです。Python を使って日々データを扱っている人は、ぜひ覚えてみてください。