LoginSignup
5
5

More than 5 years have passed since last update.

Luigiを扱う上での注意点

Last updated at Posted at 2017-02-01

Luigi:2.5.0
python:3.6

Luigiを扱う上での注意点

LuigiはWindows環境では並列処理はできない問題を回避されている記事があります。
windows環境でluigiに無理やり並列処理をさせる

requires及びrunの実行回数について

ジェネレータの扱いに問題があり、依存タスクを返却するタスクのrequiresメソッドとrunメソッドはスケジューラから複数回呼び出されます。
これは、即ち、runやrequiresに書いた処理は状況によっては、複数回実行されることになります。
そのため、コストの高い処理や、外部に影響を及ぼす処理は依存タスクを返却するメソッドには書かない方が無難です。

依存元タスクから依存先タスクの処理に遷移した際に、依存元タスクのジェネレータオブジェクトを上書きした上、依存元タスクに戻ってきたときに新しくジェネレータオブジェクトを取得しなおしているので毎回、メソッドの先頭から再開されます。
https://github.com/mtoriumi/luigi/blob/5678b6119ed260e8fb43410675be6d6daea445d1/luigi/worker.py#L130

regenerated_task_gen.gif

Sample:

from luigi import Task, run
from luigi.mock import MockTarget
from inspect import currentframe


class DependentTask(Task):

    def output(self):
        print("running {}.{}".format(self.__class__.__name__, currentframe().f_code.co_name))
        return MockTarget('out.txt')

    def run(self):
        print("running {}.{}".format(self.__class__.__name__, currentframe().f_code.co_name))
        with self.output().open('w') as fout:
            fout.write('DependentTask is succeeded')

    def on_success(self):
        print("Reached {}.{}".format(self.__class__.__name__, currentframe().f_code.co_name))

    def on_failure(self, exception):
        print("Reached {}.{} {}".format(self.__class__.__name__, currentframe().f_code.co_name, exception))


class StartTask(Task):

    def output(self):
        print("running {}.{}".format(self.__class__.__name__, currentframe().f_code.co_name))
        return MockTarget("StartTaskOut.txt")

    def run(self):
        print("running {}.{}".format(self.__class__.__name__, currentframe().f_code.co_name))
        with self.output().open('w') as fout:
            fout.write('StartTask is succeeded')
        yield DependentTask()

    def on_success(self):
        print("Reached {}.{}".format(self.__class__.__name__, currentframe().f_code.co_name))

    def on_failure(self, exception):
        print("Reached {}.{} {}".format(self.__class__.__name__, currentframe().f_code.co_name, exception))


if __name__ == '__main__':
    run(main_task_cls=StartTask)

Output:

running StartTask.output
running StartTask.run
running StartTask.output
running DependentTask.output
running DependentTask.output
running DependentTask.run
running DependentTask.output
Reached DependentTask.on_success
running StartTask.run
running StartTask.output
running DependentTask.output
running DependentTask.output
Reached StartTask.on_success

タスクごとのリトライ回数制限について

タスクごとのリトライ回数指定である、retry_countの処理にはバグがあり、2回目以降のリトライ時、スケジューラのリトライ回数制限の設定でうわがかれます。現状、まともに動いていないため、使用しないことをお勧めします。
原因は下記の2回目以降のタスク登録箇所でリトライポリシー(retry_policy_dict)を指定していないからです。
https://github.com/spotify/luigi/blob/b33aa3033405bfe41d9f5a806d70fb8e98214d86/luigi/worker.py#L974-L984

同様の問題は過去にも以下で質問されています。
http://stackoverflow.com/questions/39595786/luigi-per-task-retry-policy

こちらは、現在、プルリクエストを送信しており、レビュー待ちの問題です。
https://github.com/spotify/luigi/pull/2012

マージされました。

luigid(セントラルスケジューラ)の設定について

luigidを起動する際も、LUIGI_CONFIG_PATH環境変数の指定が必要。
luigidは独立して動くので、schedulerセクションの内容を反映させたければ、luigid起動時に設定ファイルを指定しないといけない。

タスクのメソッドの処理順について

タスクは、output() => requires => run()の順番で処理されます。

パラメータのコマンドラインからの引き渡し

パラメータをコマンドラインから引き渡す際に、パラメータ名のアンダースコアはハイフンに変更される。

パラメータ引き取り時の型変換

DictParameterでパラメータを引き取ると、FrozenOrderedDictという独自の型にされる。
そのため、組み込み型で使用できていた一部メソッドが使えなくなる。

パラメータに渡して良い値

並列タスク間では、組み込み型以外のパラメータは引き回せない。
オブジェクトを与えた場合は、クラス名文字列に変換される。
直列だと引き渡せちゃう。
これは、並列タスクが複数のプロセスで動作する仕様で、パラメータが一旦、JSON形式に変換されることによるもの。

タスクの終了条件

タスクが終了する条件は、次の二つ。

  • outputで指定したターゲットへのファイル作成=ターゲットのopen/close(タスクの成功)
  • タスク内における例外発生(タスクの失敗)

上記のいずれかが満たされない場合、そのタスクは殺されるまで永遠に終わらない。

ターゲットのopen中の例外発生

ターゲットを書き込みモードでopenした状態で例外が発生すると、File busyで一時書き込みファイルがゴミとして残るため、ターゲットをopenしたあとに例外が発生し得る処理を書いてはならない。

5
5
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
5