LoginSignup
3
2

More than 5 years have passed since last update.

Luigiよもや話 ー独自のTargetを作る、schedulerのこと、luigi.Parameter()のこと、モジュール分割ー

Last updated at Posted at 2016-11-25

Luigiの話題をいくつか

話題1:Luigiで独自のTargetを作る

依存関係のあるLuigiのタスクの処理ステータスを管理する時に、シンプルにやるならファイルベースのluigi.LocalTarget('foo.txt')を使ったりしますが、独自の基準でオレオレTargetを作りたい場合のサンプルです。

mock_target.py
# -*- coding: utf-8 -*-

import os
import luigi

class MockTarget(luigi.target.Target):

    def __init__(self, name):
        self.my_file = "./task_" + name

    def exists(self):
        return os.path.isfile(self.my_file)

    def touch(self):
        with open(self.my_file, "w") as file:
            file.write("done")

class Task1(luigi.Task):

    def requires(self):
        return Task2()

    def run(self):
        self.output().touch()

    def output(self):
        return MockTarget("t1")

class Task2(luigi.Task):

    def requires(self):
        return []

    def run(self):
        self.output().touch()

    def output(self):
        return MockTarget("t2")

if __name__ == '__main__':
    luigi.run()

ターミナルから実行

python mock_target.py Task1 --local-scheduler

luigi.LocalTargetの代わりに、MockTargetを使っています。このうち、exists()はタスクの完了をチェックするためのメソッドで、Luigiのフレームワークないから呼ばれます。touch()は見ての通りrun()から呼び出しているので、特にこの名前である必要ありません。また、run()の中でself.output()経由でMockTargetを生成していますが、つじつまさえあっていればこれも必須ではないです。

例えばこんなのでも動きます。

mock_target.py
# -*- coding: utf-8 -*-

import os
import luigi

class MockTarget(luigi.target.Target):

    def __init__(self, name):
        self.my_file = "./taskss_" + name

    def exists(self):
        return os.path.isfile(self.my_file)

    def finish(self):
        with open(self.my_file, "w") as file:
            file.write("done")

class Task1(luigi.Task):

    def requires(self):
        return Task2()

    def run(self):
        MockTarget("t1").finish()

    def output(self):
        return MockTarget("t1")


class Task2(luigi.Task):

    def requires(self):
        return []

    def run(self):
        MockTarget("t2").finish()

    def output(self):
        return MockTarget("t2")

if __name__ == '__main__':
    luigi.run()   

話題2:schedulerのこと

local-scheduler

さきほどの例では、--local-schedulerをつかってLuigiタスクを実行しています。ターミナルから手軽に動かせるので便利です。

python mock_target.py Task1 --local-scheduler

luigid

別の方法もあって、ターミナルでluigidでスケジューラを起動しておき

#とあるターミナルで
luigid

もう一つターミナルを立ち上げて、local-schedulerパラメータなしで立ち上げる方法もあります。

#別のターミナルで実行
python mock_target.py Task1

この場合、ブラウザから http://localhost:8082/static/visualiser/index.html# にアクセスすると実行中のタスクを見ることができます。

luigi1.png

依存関係も見れて便利です。
luigi2.png

話題3:luigi.Parameter()のこと

luigi.Parameter()という機能があります。公式にも説明があり、Qiitaでもちょくちょく記事があり、ライブラリにもサンプルコードあるのですが、眺めていてもさっぱり使い方がわかりませんでした。

見てもよくわからなかったサンプル

luigi/parameter.py
class MyTask(luigi.Task):
    foo = luigi.Parameter()

class RequiringTask(luigi.Task):
    def requires(self):
        return MyTask(foo="hello")

    def run(self):
        print(self.requires().foo)  # prints "hello"

よくわからないまま、いろいろ試してみます。

param_sample.py
# -*- coding: utf-8 -*-

import luigi


class Task1(luigi.Task):

    def requires(self):
        return [Task2(param1="Foo", param2="Bar"),
                Task2(param1="Baz", param2="qux")]

    def run(self):
        with self.output().open('w') as f:
            f.write('done')

    def output(self):
        return luigi.LocalTarget("./done_Task1")


class Task2(luigi.Task):

    param1 = luigi.Parameter()
    param2 = luigi.Parameter()

    def requires(self):
        return []

    def run(self):
        with self.output().open('w') as f:
            f.write('done')

    def output(self):
        filename = "./done_Task2_" +  self.param1 + "_" + self.param2
        return luigi.LocalTarget(filename)


if __name__ == '__main__':
    luigi.run()   

このモジュールを実行すると

#ターミナルで
python param_sample.py Task1 

3ファイル作られました。
done_Task1
done_Task2_Baz_qux
done_Task2_Foo_Bar
luigi3.png

Task2の下記箇所で、Task2(param1="Foo", param2="Bar")で指定した値が取れていますが、不思議なことにparam1とparam2の行を入れ替えても同じ結果になります。

もともと
param1 = luigi.Parameter()
param2 = luigi.Parameter()

これでも同じ結果
param2 = luigi.Parameter()
param1 = luigi.Parameter()

luigi.Parameter()は戻り値を受け取る変数名が何かを知っているようです。Pythonだと普通の動きなのかもしれませんが。。

話題4:Luigiモジュールの分割

よくみるサンプルだとluigi.Taskを1モジュールで書いていますが、依存関係が複雑になるとサブモジュールに分けたくなりそうなので試してみます。

main.pyとsub.pyの2モージュールを作って、main.pyのTask1がsub.pyのSub_Task3に依存するようにしてみます。

main.py
import os
import luigi
from sub import Sub_Task3

class Task1(luigi.Task):

    def requires(self):
        return [Task2(), Sub_Task3()]

    def run(self):
        with self.output().open('w') as f:
            f.write('done')

    def output(self):
        return luigi.LocalTarget("Task1")

class Task2(luigi.Task):

    def requires(self):
        return []

    def run(self):
        with self.output().open('w') as f:
            f.write('done')

    def output(self):
        return luigi.LocalTarget("Task2")

if __name__ == '__main__':
    luigi.run()
sub.py
import os
import luigi

class Sub_Task3(luigi.Task):

    def requires(self):
        return Sub_Task4()

    def run(self):
        with self.output().open('w') as f:
            f.write('done')

    def output(self):
        return luigi.LocalTarget("Sub_Task3")

class Sub_Task4(luigi.Task):

    def requires(self):
        return []

    def run(self):
        with self.output().open('w') as f:
            f.write('done')

    def output(self):
        return luigi.LocalTarget("Sub_Task4")

if __name__ == '__main__':
    luigi.run()

実行

$ python main.py Task1
中略
===== Luigi Execution Summary =====

Scheduled 4 tasks of which:
* 4 ran successfully:
    - 1 Sub_Task3()
    - 1 Sub_Task4()
    - 1 Task1()
    - 1 Task2()

実行結果

luigi4.png

単純にモジュールを分割しただけなので、予想どおりの動作になりました。

3
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
2