Luigiよもや話 ー独自のTargetを作る、schedulerのこと、luigi.Parameter()のこと、モジュール分割ー

More than 1 year has passed since last update.

Luigiの話題をいくつか

話題1:Luigiで独自のTargetを作る

依存関係のあるLuigiのタスクの処理ステータスを管理する時に、シンプルにやるならファイルベースのluigi.LocalTarget('foo.txt')を使ったりしますが、独自の基準でオレオレTargetを作りたい場合のサンプルです。

mock_target.py
# -*- coding: utf-8 -*-

import os
import luigi

class MockTarget(luigi.target.Target):

    def __init__(self, name):
        self.my_file = "./task_" + name

    def exists(self):
        return os.path.isfile(self.my_file)

    def touch(self):
        with open(self.my_file, "w") as file:
            file.write("done")

class Task1(luigi.Task):

    def requires(self):
        return Task2()

    def run(self):
        self.output().touch()

    def output(self):
        return MockTarget("t1")

class Task2(luigi.Task):

    def requires(self):
        return []

    def run(self):
        self.output().touch()

    def output(self):
        return MockTarget("t2")

if __name__ == '__main__':
    luigi.run()

ターミナルから実行

python mock_target.py Task1 --local-scheduler

luigi.LocalTargetの代わりに、MockTargetを使っています。このうち、exists()はタスクの完了をチェックするためのメソッドで、Luigiのフレームワークないから呼ばれます。touch()は見ての通りrun()から呼び出しているので、特にこの名前である必要ありません。また、run()の中でself.output()経由でMockTargetを生成していますが、つじつまさえあっていればこれも必須ではないです。

例えばこんなのでも動きます。

mock_target.py
# -*- coding: utf-8 -*-

import os
import luigi

class MockTarget(luigi.target.Target):

    def __init__(self, name):
        self.my_file = "./taskss_" + name

    def exists(self):
        return os.path.isfile(self.my_file)

    def finish(self):
        with open(self.my_file, "w") as file:
            file.write("done")

class Task1(luigi.Task):

    def requires(self):
        return Task2()

    def run(self):
        MockTarget("t1").finish()

    def output(self):
        return MockTarget("t1")


class Task2(luigi.Task):

    def requires(self):
        return []

    def run(self):
        MockTarget("t2").finish()

    def output(self):
        return MockTarget("t2")

if __name__ == '__main__':
    luigi.run()   

話題2:schedulerのこと

local-scheduler

さきほどの例では、--local-schedulerをつかってLuigiタスクを実行しています。ターミナルから手軽に動かせるので便利です。

python mock_target.py Task1 --local-scheduler

luigid

別の方法もあって、ターミナルでluigidでスケジューラを起動しておき

#とあるターミナルで
luigid

もう一つターミナルを立ち上げて、local-schedulerパラメータなしで立ち上げる方法もあります。

#別のターミナルで実行
python mock_target.py Task1

この場合、ブラウザから http://localhost:8082/static/visualiser/index.html# にアクセスすると実行中のタスクを見ることができます。

luigi1.png

依存関係も見れて便利です。
luigi2.png

話題3:luigi.Parameter()のこと

luigi.Parameter()という機能があります。公式にも説明があり、Qiitaでもちょくちょく記事があり、ライブラリにもサンプルコードあるのですが、眺めていてもさっぱり使い方がわかりませんでした。

見てもよくわからなかったサンプル

luigi/parameter.py
class MyTask(luigi.Task):
    foo = luigi.Parameter()

class RequiringTask(luigi.Task):
    def requires(self):
        return MyTask(foo="hello")

    def run(self):
        print(self.requires().foo)  # prints "hello"

よくわからないまま、いろいろ試してみます。

param_sample.py
# -*- coding: utf-8 -*-

import luigi


class Task1(luigi.Task):

    def requires(self):
        return [Task2(param1="Foo", param2="Bar"),
                Task2(param1="Baz", param2="qux")]

    def run(self):
        with self.output().open('w') as f:
            f.write('done')

    def output(self):
        return luigi.LocalTarget("./done_Task1")


class Task2(luigi.Task):

    param1 = luigi.Parameter()
    param2 = luigi.Parameter()

    def requires(self):
        return []

    def run(self):
        with self.output().open('w') as f:
            f.write('done')

    def output(self):
        filename = "./done_Task2_" +  self.param1 + "_" + self.param2
        return luigi.LocalTarget(filename)


if __name__ == '__main__':
    luigi.run()   

このモジュールを実行すると

#ターミナルで
python param_sample.py Task1 

3ファイル作られました。
done_Task1
done_Task2_Baz_qux
done_Task2_Foo_Bar
luigi3.png

Task2の下記箇所で、Task2(param1="Foo", param2="Bar")で指定した値が取れていますが、不思議なことにparam1とparam2の行を入れ替えても同じ結果になります。

もともと
param1 = luigi.Parameter()
param2 = luigi.Parameter()

これでも同じ結果
param2 = luigi.Parameter()
param1 = luigi.Parameter()

luigi.Parameter()は戻り値を受け取る変数名が何かを知っているようです。Pythonだと普通の動きなのかもしれませんが。。

話題4:Luigiモジュールの分割

よくみるサンプルだとluigi.Taskを1モジュールで書いていますが、依存関係が複雑になるとサブモジュールに分けたくなりそうなので試してみます。

main.pyとsub.pyの2モージュールを作って、main.pyのTask1がsub.pyのSub_Task3に依存するようにしてみます。

main.py
import os
import luigi
from sub import Sub_Task3

class Task1(luigi.Task):

    def requires(self):
        return [Task2(), Sub_Task3()]

    def run(self):
        with self.output().open('w') as f:
            f.write('done')

    def output(self):
        return luigi.LocalTarget("Task1")

class Task2(luigi.Task):

    def requires(self):
        return []

    def run(self):
        with self.output().open('w') as f:
            f.write('done')

    def output(self):
        return luigi.LocalTarget("Task2")

if __name__ == '__main__':
    luigi.run()
sub.py
import os
import luigi

class Sub_Task3(luigi.Task):

    def requires(self):
        return Sub_Task4()

    def run(self):
        with self.output().open('w') as f:
            f.write('done')

    def output(self):
        return luigi.LocalTarget("Sub_Task3")

class Sub_Task4(luigi.Task):

    def requires(self):
        return []

    def run(self):
        with self.output().open('w') as f:
            f.write('done')

    def output(self):
        return luigi.LocalTarget("Sub_Task4")

if __name__ == '__main__':
    luigi.run()

実行

$ python main.py Task1
中略
===== Luigi Execution Summary =====

Scheduled 4 tasks of which:
* 4 ran successfully:
    - 1 Sub_Task3()
    - 1 Sub_Task4()
    - 1 Task1()
    - 1 Task2()

実行結果

luigi4.png

単純にモジュールを分割しただけなので、予想どおりの動作になりました。