Luigiでタスクが失敗したことをコマンドの返り値で判断したいときの方法。
cronからキックされる用のラッパースクリプトや、ジョブ管理ツールから起動する場合に必要となる。
ネットを調べたがあまり情報が載っていなかったので、自分で調べた結果をまとめておく。
モチベーション
Luigiはデフォルトだと、タスクが失敗してもコマンドの戻り値が0である。
そのため、戻り値によるエラーハンドリング等ができないし、Jenkins等からキックしている場合タスクが失敗しているのにJenkinsのジョブとしてはSUCCESSになってしまう。
以下、例としてluigiのexamplesにあるhello_world.pyを少し書き換えて、0除算例外を発生させるようにしておく。
$ git diff
diff --git a/examples/hello_world.py b/examples/hello_world.py
index c99c252..d8adfa5 100644
--- a/examples/hello_world.py
+++ b/examples/hello_world.py
@@ -5,7 +5,8 @@ class HelloWorldTask(luigi.Task):
task_namespace = 'examples'
def run(self):
- print("{task} says: Hello world!".format(task=self.__class__.__name__))
+ #print("{task} says: Hello world!".format(task=self.__class__.__name__))
+ return 1/0
if __name__ == '__main__':
luigi.run(['examples.HelloWorldTask', '--workers', '1', '--local-scheduler'])
これを実行するとタスクは失敗するが戻り値は0でタスク成功時と区別がつかない。
$ PYTHONPATH=examples luigi --module hello_world examples.HelloWorldTask
(...snip...)
Traceback (most recent call last):
File "/Users/bwtakacy/.pyenv/versions/anaconda3-4.1.1/lib/python3.5/site-packages/luigi/worker.py", line 192, in run
new_deps = self._run_get_new_deps()
File "/Users/bwtakacy/.pyenv/versions/anaconda3-4.1.1/lib/python3.5/site-packages/luigi/worker.py", line 130, in _run_get_new_deps
task_gen = self.task.run()
File "/Users/bwtakacy/Develop/luigi/examples/hello_world.py", line 9, in run
return 1/0
ZeroDivisionError: division by zero
DEBUG: 1 running tasks, waiting for next task to finish
/Users/bwtakacy/.pyenv/versions/anaconda3-4.1.1/lib/python3.5/site-packages/luigi/parameter.py:259: UserWarning: Parameter None is not of type string.
warnings.warn("Parameter {0} is not of type string.".format(str(x)))
(...snip...)
INFO:
===== Luigi Execution Summary =====
Scheduled 1 tasks of which:
* 1 failed:
- 1 examples.HelloWorldTask()
This progress looks :( because there were failed tasks
===== Luigi Execution Summary =====
$ echo $?
0
解決方法
luigi.cfgにてretcode設定をし、luigiコマンド実行時に読み込ませるようにすればよい。
retcode設定についてはドキュメントをよく読むとConfigurationに
We recommend that you copy this set of exit codes to your luigi.cfg file:
とあって、以下の設定例が記載されている。(デフォルトの設定を変えて欲しい。。。)
[retcode]
# The following return codes are the recommended exit codes for Luigi
# They are in increasing level of severity (for most applications)
already_running=10
missing_data=20
not_run=25
task_failed=30
scheduling_error=35
unhandled_exception=40
これにより、例えばタスクが失敗した場合にはluigiコマンドの戻り値が30になる。
luigi.cfgはドキュメントによると
- /etc/luigi/client.cfg
- カレントディレクトリ
- 環境変数 LUIGI_CONFIG_PATH で指定したパス
のどこかにあればよく、下のものほど優先度が高くなる。
以下はカレントディレクトリに置いた例。
$ cat luigi.cfg
[retcode]
# The following return codes are the recommended exit codes for Luigi
# They are in increasing level of severity (for most applications)
already_running=10
missing_data=20
not_run=25
task_failed=30
scheduling_error=35
unhandled_exception=40
$ PYTHONPATH=examples luigi --module hello_world examples.HelloWorldTask
(...snip...)
Traceback (most recent call last):
File "/Users/bwtakacy/.pyenv/versions/anaconda3-4.1.1/lib/python3.5/site-packages/luigi/worker.py", line 192, in run
new_deps = self._run_get_new_deps()
File "/Users/bwtakacy/.pyenv/versions/anaconda3-4.1.1/lib/python3.5/site-packages/luigi/worker.py", line 130, in _run_get_new_deps
task_gen = self.task.run()
File "/Users/bwtakacy/Develop/luigi/examples/hello_world.py", line 9, in run
return 1/0
ZeroDivisionError: division by zero
(...snip...)
INFO:
===== Luigi Execution Summary =====
Scheduled 1 tasks of which:
* 1 failed:
- 1 examples.HelloWorldTask()
This progress looks :( because there were failed tasks
===== Luigi Execution Summary =====
$ echo $?
30
pythonコマンドでLuigiタスクを起動している場合の注意点
luigiコマンドを使ってLuigiタスクを起動している場合はretcode設定をするだけで良いのだが、pythonコマンドで起動している場合には以下の点も気をつける必要がある。
pythonコマンドから起動した際の挙動としてLuigiタスクを記述したpythonスクリプトファイルにて
if __name__ == "__main__":
luigi.run()
と書いてあるとダメ。
import luigi.cmdline
if __name__ == "__main__":
luigi.cmdline.luigi_run()
にしなければならない。
Luigiのドキュメントやexamplesのサンプルスクリプトが軒並みluigi.run
を使っているので全く気がつかなかった。
ほんとLuigiさんはコードを読め圧力が強くて困る。
luigi.cmdline.luigi_run
はluigi.run
を呼び出し、その中で例外やエラーが起きた場合にはretcode設定に従って戻り値を設定する役割を行なっている。
luigiコマンドの正体
ちなみに、luigiコマンドの正体はluigi.cmdline.luigi_run
を呼び出している単なるラッパースクリプトである。
#!/usr/bin/env python
import sys
import warnings
import luigi.cmdline
def main(argv):
warnings.warn("'bin/luigi' has moved to console script 'luigi'", DeprecationWarning)
luigi.cmdline.luigi_run(argv)
if __name__ == '__main__':
main(sys.argv[1:])
以上。