LoginSignup
12
4

More than 5 years have passed since last update.

Luigiでタスク失敗時のコマンド戻り値を設定する

Posted at

Luigiでタスクが失敗したことをコマンドの返り値で判断したいときの方法。

cronからキックされる用のラッパースクリプトや、ジョブ管理ツールから起動する場合に必要となる。

ネットを調べたがあまり情報が載っていなかったので、自分で調べた結果をまとめておく。

モチベーション

Luigiはデフォルトだと、タスクが失敗してもコマンドの戻り値が0である。
そのため、戻り値によるエラーハンドリング等ができないし、Jenkins等からキックしている場合タスクが失敗しているのにJenkinsのジョブとしてはSUCCESSになってしまう。

以下、例としてluigiのexamplesにあるhello_world.pyを少し書き換えて、0除算例外を発生させるようにしておく。

$ git diff
diff --git a/examples/hello_world.py b/examples/hello_world.py
index c99c252..d8adfa5 100644
--- a/examples/hello_world.py
+++ b/examples/hello_world.py
@@ -5,7 +5,8 @@ class HelloWorldTask(luigi.Task):
     task_namespace = 'examples'

     def run(self):
-        print("{task} says: Hello world!".format(task=self.__class__.__name__))
+        #print("{task} says: Hello world!".format(task=self.__class__.__name__))
+        return 1/0

 if __name__ == '__main__':
     luigi.run(['examples.HelloWorldTask', '--workers', '1', '--local-scheduler'])

これを実行するとタスクは失敗するが戻り値は0でタスク成功時と区別がつかない。

$ PYTHONPATH=examples luigi --module hello_world examples.HelloWorldTask

(...snip...)

Traceback (most recent call last):
  File "/Users/bwtakacy/.pyenv/versions/anaconda3-4.1.1/lib/python3.5/site-packages/luigi/worker.py", line 192, in run
    new_deps = self._run_get_new_deps()
  File "/Users/bwtakacy/.pyenv/versions/anaconda3-4.1.1/lib/python3.5/site-packages/luigi/worker.py", line 130, in _run_get_new_deps
    task_gen = self.task.run()
  File "/Users/bwtakacy/Develop/luigi/examples/hello_world.py", line 9, in run
    return 1/0
ZeroDivisionError: division by zero
DEBUG: 1 running tasks, waiting for next task to finish
/Users/bwtakacy/.pyenv/versions/anaconda3-4.1.1/lib/python3.5/site-packages/luigi/parameter.py:259: UserWarning: Parameter None is not of type string.
  warnings.warn("Parameter {0} is not of type string.".format(str(x)))

(...snip...)

INFO: 
===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 failed:
    - 1 examples.HelloWorldTask()

This progress looks :( because there were failed tasks

===== Luigi Execution Summary =====

$ echo $?
0

解決方法

luigi.cfgにてretcode設定をし、luigiコマンド実行時に読み込ませるようにすればよい。

retcode設定についてはドキュメントをよく読むとConfiguration

We recommend that you copy this set of exit codes to your luigi.cfg file:

とあって、以下の設定例が記載されている。(デフォルトの設定を変えて欲しい。。。)

[retcode]
# The following return codes are the recommended exit codes for Luigi
# They are in increasing level of severity (for most applications)
already_running=10
missing_data=20
not_run=25
task_failed=30
scheduling_error=35
unhandled_exception=40

これにより、例えばタスクが失敗した場合にはluigiコマンドの戻り値が30になる。

luigi.cfgはドキュメントによると

  • /etc/luigi/client.cfg
  • カレントディレクトリ
  • 環境変数 LUIGI_CONFIG_PATH で指定したパス

のどこかにあればよく、下のものほど優先度が高くなる。

以下はカレントディレクトリに置いた例。

$ cat luigi.cfg 
[retcode]
# The following return codes are the recommended exit codes for Luigi
# They are in increasing level of severity (for most applications)
already_running=10
missing_data=20
not_run=25
task_failed=30
scheduling_error=35
unhandled_exception=40
$ PYTHONPATH=examples luigi --module hello_world examples.HelloWorldTask

(...snip...)

Traceback (most recent call last):
  File "/Users/bwtakacy/.pyenv/versions/anaconda3-4.1.1/lib/python3.5/site-packages/luigi/worker.py", line 192, in run
    new_deps = self._run_get_new_deps()
  File "/Users/bwtakacy/.pyenv/versions/anaconda3-4.1.1/lib/python3.5/site-packages/luigi/worker.py", line 130, in _run_get_new_deps
    task_gen = self.task.run()
  File "/Users/bwtakacy/Develop/luigi/examples/hello_world.py", line 9, in run
    return 1/0
ZeroDivisionError: division by zero

(...snip...)

INFO: 
===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 failed:
    - 1 examples.HelloWorldTask()

This progress looks :( because there were failed tasks

===== Luigi Execution Summary =====

$ echo $?
30

pythonコマンドでLuigiタスクを起動している場合の注意点

luigiコマンドを使ってLuigiタスクを起動している場合はretcode設定をするだけで良いのだが、pythonコマンドで起動している場合には以下の点も気をつける必要がある。

pythonコマンドから起動した際の挙動としてLuigiタスクを記述したpythonスクリプトファイルにて

if __name__ == "__main__":
    luigi.run()

と書いてあるとダメ。

import luigi.cmdline

if __name__ == "__main__":
    luigi.cmdline.luigi_run()

にしなければならない。

Luigiのドキュメントやexamplesのサンプルスクリプトが軒並みluigi.runを使っているので全く気がつかなかった。

ほんとLuigiさんはコードを読め圧力が強くて困る。

luigi.cmdline.luigi_runluigi.runを呼び出し、その中で例外やエラーが起きた場合にはretcode設定に従って戻り値を設定する役割を行なっている。

luigiコマンドの正体

ちなみに、luigiコマンドの正体はluigi.cmdline.luigi_runを呼び出している単なるラッパースクリプトである。

bin/luigi
#!/usr/bin/env python

import sys
import warnings
import luigi.cmdline


def main(argv):
    warnings.warn("'bin/luigi' has moved to console script 'luigi'", DeprecationWarning)
    luigi.cmdline.luigi_run(argv)


if __name__ == '__main__':
    main(sys.argv[1:])

以上。

12
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
12
4