Luigiの話題をいくつか
話題1:Luigiで独自のTargetを作る
依存関係のあるLuigiのタスクの処理ステータスを管理する時に、シンプルにやるならファイルベースのluigi.LocalTarget('foo.txt')を使ったりしますが、独自の基準でオレオレTargetを作りたい場合のサンプルです。
# -*- coding: utf-8 -*-
import os
import luigi
class MockTarget(luigi.target.Target):
def __init__(self, name):
self.my_file = "./task_" + name
def exists(self):
return os.path.isfile(self.my_file)
def touch(self):
with open(self.my_file, "w") as file:
file.write("done")
class Task1(luigi.Task):
def requires(self):
return Task2()
def run(self):
self.output().touch()
def output(self):
return MockTarget("t1")
class Task2(luigi.Task):
def requires(self):
return []
def run(self):
self.output().touch()
def output(self):
return MockTarget("t2")
if __name__ == '__main__':
luigi.run()
ターミナルから実行
python mock_target.py Task1 --local-scheduler
luigi.LocalTargetの代わりに、MockTargetを使っています。このうち、exists()はタスクの完了をチェックするためのメソッドで、Luigiのフレームワークないから呼ばれます。touch()は見ての通りrun()から呼び出しているので、特にこの名前である必要ありません。また、run()の中でself.output()経由でMockTargetを生成していますが、つじつまさえあっていればこれも必須ではないです。
例えばこんなのでも動きます。
# -*- coding: utf-8 -*-
import os
import luigi
class MockTarget(luigi.target.Target):
def __init__(self, name):
self.my_file = "./taskss_" + name
def exists(self):
return os.path.isfile(self.my_file)
def finish(self):
with open(self.my_file, "w") as file:
file.write("done")
class Task1(luigi.Task):
def requires(self):
return Task2()
def run(self):
MockTarget("t1").finish()
def output(self):
return MockTarget("t1")
class Task2(luigi.Task):
def requires(self):
return []
def run(self):
MockTarget("t2").finish()
def output(self):
return MockTarget("t2")
if __name__ == '__main__':
luigi.run()
話題2:schedulerのこと
local-scheduler
さきほどの例では、--local-schedulerをつかってLuigiタスクを実行しています。ターミナルから手軽に動かせるので便利です。
python mock_target.py Task1 --local-scheduler
luigid
別の方法もあって、ターミナルでluigidでスケジューラを起動しておき
#とあるターミナルで
luigid
もう一つターミナルを立ち上げて、local-schedulerパラメータなしで立ち上げる方法もあります。
#別のターミナルで実行
python mock_target.py Task1
この場合、ブラウザから http://localhost:8082/static/visualiser/index.html# にアクセスすると実行中のタスクを見ることができます。
話題3:luigi.Parameter()のこと
luigi.Parameter()という機能があります。公式にも説明があり、Qiitaでもちょくちょく記事があり、ライブラリにもサンプルコードあるのですが、眺めていてもさっぱり使い方がわかりませんでした。
見てもよくわからなかったサンプル
class MyTask(luigi.Task):
foo = luigi.Parameter()
class RequiringTask(luigi.Task):
def requires(self):
return MyTask(foo="hello")
def run(self):
print(self.requires().foo) # prints "hello"
よくわからないまま、いろいろ試してみます。
# -*- coding: utf-8 -*-
import luigi
class Task1(luigi.Task):
def requires(self):
return [Task2(param1="Foo", param2="Bar"),
Task2(param1="Baz", param2="qux")]
def run(self):
with self.output().open('w') as f:
f.write('done')
def output(self):
return luigi.LocalTarget("./done_Task1")
class Task2(luigi.Task):
param1 = luigi.Parameter()
param2 = luigi.Parameter()
def requires(self):
return []
def run(self):
with self.output().open('w') as f:
f.write('done')
def output(self):
filename = "./done_Task2_" + self.param1 + "_" + self.param2
return luigi.LocalTarget(filename)
if __name__ == '__main__':
luigi.run()
このモジュールを実行すると
#ターミナルで
python param_sample.py Task1
3ファイル作られました。
done_Task1
done_Task2_Baz_qux
done_Task2_Foo_Bar
Task2の下記箇所で、Task2(param1="Foo", param2="Bar")で指定した値が取れていますが、不思議なことにparam1とparam2の行を入れ替えても同じ結果になります。
もともと
param1 = luigi.Parameter()
param2 = luigi.Parameter()
これでも同じ結果
param2 = luigi.Parameter()
param1 = luigi.Parameter()
luigi.Parameter()は戻り値を受け取る変数名が何かを知っているようです。Pythonだと普通の動きなのかもしれませんが。。
話題4:Luigiモジュールの分割
よくみるサンプルだとluigi.Taskを1モジュールで書いていますが、依存関係が複雑になるとサブモジュールに分けたくなりそうなので試してみます。
main.pyとsub.pyの2モージュールを作って、main.pyのTask1がsub.pyのSub_Task3に依存するようにしてみます。
import os
import luigi
from sub import Sub_Task3
class Task1(luigi.Task):
def requires(self):
return [Task2(), Sub_Task3()]
def run(self):
with self.output().open('w') as f:
f.write('done')
def output(self):
return luigi.LocalTarget("Task1")
class Task2(luigi.Task):
def requires(self):
return []
def run(self):
with self.output().open('w') as f:
f.write('done')
def output(self):
return luigi.LocalTarget("Task2")
if __name__ == '__main__':
luigi.run()
import os
import luigi
class Sub_Task3(luigi.Task):
def requires(self):
return Sub_Task4()
def run(self):
with self.output().open('w') as f:
f.write('done')
def output(self):
return luigi.LocalTarget("Sub_Task3")
class Sub_Task4(luigi.Task):
def requires(self):
return []
def run(self):
with self.output().open('w') as f:
f.write('done')
def output(self):
return luigi.LocalTarget("Sub_Task4")
if __name__ == '__main__':
luigi.run()
実行
$ python main.py Task1
中略
===== Luigi Execution Summary =====
Scheduled 4 tasks of which:
* 4 ran successfully:
- 1 Sub_Task3()
- 1 Sub_Task4()
- 1 Task1()
- 1 Task2()
実行結果
単純にモジュールを分割しただけなので、予想どおりの動作になりました。