LoginSignup
9
6

More than 5 years have passed since last update.

俺は SciLuigi で行く!

Last updated at Posted at 2018-06-22

要約

ワークフローマネージメントシステムLuigiをデータ分析に使うときに生じる様々な欠点を解消したSciLuigiというライブラリがある。便利だからみんな使おう。
フレーバーを理解してもらうだけの記事なので使い方とかは公式 doc みてね。

背景

何度かこれまでに記事にも書いた Luigi はここ数年の私のデータ分析作業の心強いお供であり続けて来ました。Luigi がデータ分析においていかに強力なツールかは他の記事による解説1に譲りますが、とにかく Luigi は「しっかりと結果の一貫性を保って計算する」ことに関するトラブルを大幅に軽減してくれ、計算処理用のプログラムを整理した形にしてくれました。

とはいえ、不満が全くないわけではありませんでした。Luigi は元々データ分析ではなく ETL 処理を運用することに主眼を置いており、トライアル&エラーを繰り返すデータ分析にはあまりそぐわない面もあります。例えば以下のような点です。

  • ワークフローの途中にある結果が変更される状況でも、場合によってはワークフロー全体が再計算されない
  • ワークフローの上流にあるタスクを簡単に切り替えることができない
  • ワークフローの上流にあるタスクのパラメータ変更が容易でない

ワークフローの途中での再計算が行われた場合にも場合によってはワークフロー全体が再計算されない

例えば A -> B -> C というワークフローがあった場合、通常 Luigi ではタスクCが完了しているかどうかをチェックし、タスクCの結果が存在していれば B や A が完了しているかはチェックされません。ただしこの問題については各タスクの complete() メソッドをオーバーライドすることで回避可能です。

ワークフローの上流にあるタスクを簡単に切り替えることができない

例えば


from luigi import ExternalTarget, Parameter, Task

class ExternalData(Task):
     def output(self):
         return ExternalTarget('/path/to/file')
     ...

class Preprocess1(Task):
    def requires(self):
        yield ExternalData()
    ...

class Preprocess2(Task):
    def requires(self):
        yield Preprocess1()
    ...

class Fit(Task):
    alpha = Parameter()
    l = Parameter()
    def requires(self):
        yield Preprocess2()
    ...

というパイプラインがあった場合に、パイプライン全体を異なるデータに対して流すには Preprocess1 の依存タスクを変える必要があります。異なるデータソースに対して流すたびに新しくPreprocess1からFitまでを再定義するのか? と考えると、これはあまりメンテナンス性が高いとは言えません。
まあこれくらいの例だったら Parameter 使えばいいじゃん、となる訳ですが、そうすると次で述べる問題にヒットします。

ワークフローの上流にあるタスクのパラメータ変更が容易でない

以下のようなタスクを考えましょう。

from luigi import ExternalTarget, Parameter, Task

class ExternalData(Task):
    def output(self):
        return ExternalTarget('/path/to/file')
    ...

class Fit(Task):
    alpha = Parameter()
    l = Parameter()
    def run(self):
        ...

この Fit を様々なパラメータで実行して予測を行い、それぞれについて結果のプロットを作成するという状況は考えらえるものかと思いますが、そのまま書くとこうなるでしょうか。

class Predict(Task):
    alpha = Parameter()
    l = Parameter()
    def requires(self):
        yield Fit(alpha=alpha, l=l)
        ...

class Evaluate(Task):
    alpha = Parameter()
    l = Parameter()
    def requires(self):
        yield Evaluate(alpha=alpha, l=l)
        ...

class Plot(Task):
    alpha = Parameter()
    l = Parameter()
    def requires(self):
        yield Predict(alpha=alpha, l=l)
        ...

Fit に渡すパラメータである alpha や l が下流のタスクでまで必要になっているのがみて取れるかと思います。もちろんこれは Plot や Evaluate の変更に対する柔軟性を著しく損なってしまいます。

SciLuigi で行こうぜ

SciLuigi は「タスクの処理定義」と「依存関係の定義」を分離することによって上記3つの問題のうち後二者の問題に対処しています。

まず一つ目の例を SciLuigi で書き直しましょう。

from sciluigi import TargetInfo, Paramter, Task, WorkflowTask

class ExternalData(Task):
    def out_csv(self):
        return TargetInfo(self, '/path/to/file')

class Preprocess1(Task):
    in_data = None
    def out_csv(self):
        return TargetInfo(self, '/path/to/preprocessed_file1')
    def run(self):
        ...

class Preprocess2(Task):
    in_data = None
    def out_csv(self):
        return TargetInfo(self, '/path/to/preprocessed_file1')
    def run(self):
        ...

class Fit(Task):
    in_data = None
    alpha = Parameter()
    l = Parameter()
    def out_pkl(self):
        return TargetInfo(self, '/path/to/pkl_model')
    def run(self):
        ...

class PreprocessAndFit(WorkflowTask):
    alpha = Parameter()
    l = Parameter()
    def workflow(self):
        data = self.new_task('external_data', ExternalData)
        perprocess1 = self.new_task('preprocess1', Preprocess1)
        perprocess2 = self.new_task('preprocess2', Preprocess2)
        fit = self.new_task('fit', Fit)
        preprocess1.in_csv = data.out_csv
        preprocess2.in_csv = preprocess1.out_csv
        fit.in_csv = preprocess2.out_csv
        return fit

Taskin_data には依存タスクそのものを書かないで済むので、入力タスクごと変更するとしても PreprocessAndFit だけを新たに定義すれば良いということになります。(普通はパラメータ使うでしょうけど、この程度なら)

では次の例に。

from sciluigi import TargetInfo, Paramter, Task, WorkflowTask

class ExternalData(Task):
    def out_csv(self):
        return TargetInfo(self, '/path/to/file')
    ...

class ExternalData2(Task):
    def out_csv(self):
        return TargetInfo(self, '/path/to/file')
    ...

class Fit(Task):
    alpha = Parameter()
    l = Parameter()
    in_training_data = None
    def out_model(self):
        ...


class Predict(Task):
    in_model = None
    in_predict_data = None

    def out_prediction(self):
        ...

class Evaluate(Task):
    in_predict_data = None
    in_prediction = None

    def out_evaluation_result(self):
        ...

class Plot(Task):
    in_predict_data = None
    in_prediction = None
    in_evaluation = None

    def out_plot(self):
        ...

class EvaluationWorkflow(WorkflowTask):
    alpha = Parameter()
    l = Parameter()

    def workflow(self):
        training_data = self.new_task('external_data', ExternalData)
        prediction_data = self.new_task('external_data2', ExternalData2)
        fit = self.new_task('fit', Fit, alpha=self.alpha, l=self.l)
        predict = self.new_task('predict', Predict)
        evaluate = self.new_task('evaluate', Evaluate)
        plot = self.new_task('plot', Plot)

        fit.in_training_data = training_data.out_csv

        predict.in_model = fit.out_model
        predict.in_predict_data = prediction_data.out_csv

        evaluate.in_predict_data = prediction_data.out_csv
        evaluate.in_prediction = prediction.out_prediction

        plot.in_predict_data = prediction_data.out_csv
        plot.in_prediction = prediction.out_prediction
        plot.in_evaluation = evaluate.out_evaluation
        return plot

Fit 以外の Task からハイパーパラメータを除去することができ、本質的に入力として必要なものだけが残りましたね!これで PredictEvaluate の再利用性ははるかに上がるでしょう。 ワークフローの中でどのパラメータがどのタスクの動作に本当に影響を及ぼすのかもはっきりしました。

最後に

みんなも  SciLuigi 使おう!

9
6
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
9
6