要約
ワークフローマネージメントシステムLuigiをデータ分析に使うときに生じる様々な欠点を解消したSciLuigiというライブラリがある。便利だからみんな使おう。
フレーバーを理解してもらうだけの記事なので使い方とかは公式 doc みてね。
背景
何度かこれまでに記事にも書いた Luigi はここ数年の私のデータ分析作業の心強いお供であり続けて来ました。Luigi がデータ分析においていかに強力なツールかは他の記事による解説1に譲りますが、とにかく Luigi は「しっかりと結果の一貫性を保って計算する」ことに関するトラブルを大幅に軽減してくれ、計算処理用のプログラムを整理した形にしてくれました。
とはいえ、不満が全くないわけではありませんでした。Luigi は元々データ分析ではなく ETL 処理を運用することに主眼を置いており、トライアル&エラーを繰り返すデータ分析にはあまりそぐわない面もあります。例えば以下のような点です。
- ワークフローの途中にある結果が変更される状況でも、場合によってはワークフロー全体が再計算されない
- ワークフローの上流にあるタスクを簡単に切り替えることができない
- ワークフローの上流にあるタスクのパラメータ変更が容易でない
ワークフローの途中での再計算が行われた場合にも場合によってはワークフロー全体が再計算されない
例えば A -> B -> C
というワークフローがあった場合、通常 Luigi ではタスクCが完了しているかどうかをチェックし、タスクCの結果が存在していれば B や A が完了しているかはチェックされません。ただしこの問題については各タスクの complete()
メソッドをオーバーライドすることで回避可能です。
ワークフローの上流にあるタスクを簡単に切り替えることができない
例えば
from luigi import ExternalTarget, Parameter, Task
class ExternalData(Task):
def output(self):
return ExternalTarget('/path/to/file')
...
class Preprocess1(Task):
def requires(self):
yield ExternalData()
...
class Preprocess2(Task):
def requires(self):
yield Preprocess1()
...
class Fit(Task):
alpha = Parameter()
l = Parameter()
def requires(self):
yield Preprocess2()
...
というパイプラインがあった場合に、パイプライン全体を異なるデータに対して流すには Preprocess1
の依存タスクを変える必要があります。異なるデータソースに対して流すたびに新しくPreprocess1
からFit
までを再定義するのか? と考えると、これはあまりメンテナンス性が高いとは言えません。
まあこれくらいの例だったら Parameter
使えばいいじゃん、となる訳ですが、そうすると次で述べる問題にヒットします。
ワークフローの上流にあるタスクのパラメータ変更が容易でない
以下のようなタスクを考えましょう。
from luigi import ExternalTarget, Parameter, Task
class ExternalData(Task):
def output(self):
return ExternalTarget('/path/to/file')
...
class Fit(Task):
alpha = Parameter()
l = Parameter()
def run(self):
...
この Fit を様々なパラメータで実行して予測を行い、それぞれについて結果のプロットを作成するという状況は考えらえるものかと思いますが、そのまま書くとこうなるでしょうか。
class Predict(Task):
alpha = Parameter()
l = Parameter()
def requires(self):
yield Fit(alpha=alpha, l=l)
...
class Evaluate(Task):
alpha = Parameter()
l = Parameter()
def requires(self):
yield Evaluate(alpha=alpha, l=l)
...
class Plot(Task):
alpha = Parameter()
l = Parameter()
def requires(self):
yield Predict(alpha=alpha, l=l)
...
Fit に渡すパラメータである alpha や l が下流のタスクでまで必要になっているのがみて取れるかと思います。もちろんこれは Plot や Evaluate の変更に対する柔軟性を著しく損なってしまいます。
SciLuigi で行こうぜ
SciLuigi は「タスクの処理定義」と「依存関係の定義」を分離することによって上記3つの問題のうち後二者の問題に対処しています。
まず一つ目の例を SciLuigi で書き直しましょう。
from sciluigi import TargetInfo, Paramter, Task, WorkflowTask
class ExternalData(Task):
def out_csv(self):
return TargetInfo(self, '/path/to/file')
class Preprocess1(Task):
in_data = None
def out_csv(self):
return TargetInfo(self, '/path/to/preprocessed_file1')
def run(self):
...
class Preprocess2(Task):
in_data = None
def out_csv(self):
return TargetInfo(self, '/path/to/preprocessed_file1')
def run(self):
...
class Fit(Task):
in_data = None
alpha = Parameter()
l = Parameter()
def out_pkl(self):
return TargetInfo(self, '/path/to/pkl_model')
def run(self):
...
class PreprocessAndFit(WorkflowTask):
alpha = Parameter()
l = Parameter()
def workflow(self):
data = self.new_task('external_data', ExternalData)
perprocess1 = self.new_task('preprocess1', Preprocess1)
perprocess2 = self.new_task('preprocess2', Preprocess2)
fit = self.new_task('fit', Fit)
preprocess1.in_csv = data.out_csv
preprocess2.in_csv = preprocess1.out_csv
fit.in_csv = preprocess2.out_csv
return fit
各Task
の in_data
には依存タスクそのものを書かないで済むので、入力タスクごと変更するとしても PreprocessAndFit
だけを新たに定義すれば良いということになります。(普通はパラメータ使うでしょうけど、この程度なら)
では次の例に。
from sciluigi import TargetInfo, Paramter, Task, WorkflowTask
class ExternalData(Task):
def out_csv(self):
return TargetInfo(self, '/path/to/file')
...
class ExternalData2(Task):
def out_csv(self):
return TargetInfo(self, '/path/to/file')
...
class Fit(Task):
alpha = Parameter()
l = Parameter()
in_training_data = None
def out_model(self):
...
class Predict(Task):
in_model = None
in_predict_data = None
def out_prediction(self):
...
class Evaluate(Task):
in_predict_data = None
in_prediction = None
def out_evaluation_result(self):
...
class Plot(Task):
in_predict_data = None
in_prediction = None
in_evaluation = None
def out_plot(self):
...
class EvaluationWorkflow(WorkflowTask):
alpha = Parameter()
l = Parameter()
def workflow(self):
training_data = self.new_task('external_data', ExternalData)
prediction_data = self.new_task('external_data2', ExternalData2)
fit = self.new_task('fit', Fit, alpha=self.alpha, l=self.l)
predict = self.new_task('predict', Predict)
evaluate = self.new_task('evaluate', Evaluate)
plot = self.new_task('plot', Plot)
fit.in_training_data = training_data.out_csv
predict.in_model = fit.out_model
predict.in_predict_data = prediction_data.out_csv
evaluate.in_predict_data = prediction_data.out_csv
evaluate.in_prediction = prediction.out_prediction
plot.in_predict_data = prediction_data.out_csv
plot.in_prediction = prediction.out_prediction
plot.in_evaluation = evaluate.out_evaluation
return plot
Fit
以外の Task
からハイパーパラメータを除去することができ、本質的に入力として必要なものだけが残りましたね!これで Predict
や Evaluate
の再利用性ははるかに上がるでしょう。 ワークフローの中でどのパラメータがどのタスクの動作に本当に影響を及ぼすのかもはっきりしました。
最後に
みんなも SciLuigi 使おう!