14
11

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Luigiでのパラメータの受け渡しを簡単にする

Last updated at Posted at 2018-07-23

目的

Pythonでデータフローの依存関係をシンプルに記述できるライブラリであるLuigiにおいて、依存関係の根本にあるタスクにパラメータを受け渡す場合、パラメータのバケツリレーが発生してしまう現象を解決します。

実際に発生した問題

以下のDependency Graphのように、依存が途中で分岐したり、依存の途中で共通処理が存在していて、かつ、パラメータは一番上位のTaskが管理しているような処理を実現したいような状況があります。

image.png

その場合は、最も上位のTaskから該当Taskまでパラメータを受け渡ししないといけません。コードに落とすと例えば以下のようになります。

愚直なコード例

# -*- coding: utf-8 -*-
import luigi


class Task1(luigi.Task):
    task_namespace = "examples"
    param1 = luigi.Parameter()
    is_done = False

    def run(self):
        print(f"Running{self.__class__.__name__} with {self.param1}")
        self.is_done = True

    def complete(self):
        return self.is_done


class Task2(luigi.Task):
    task_namespace = "examples"
    param1 = luigi.Parameter()
    param2 = luigi.Parameter()

    def run(self):
        print(f"Running {self.__class__.__name__} with {self.param2}")
        self.output().open("w").close()

    def requires(self):
        yield Task1(param1=self.param1)

    def output(self):
        return luigi.LocalTarget(f"/tmp/luigi_test/task2_{self.param2}")


class Task3(luigi.Task):
    task_namespace = "examples"
    param1 = luigi.Parameter()
    param3 = luigi.Parameter()

    def run(self):
        print(f"Running {self.__class__.__name__} with {self.param3}")
        self.output().open("w").close()

    def requires(self):
        yield Task1(param1=self.param1)

    def output(self):
        return luigi.LocalTarget(f"/tmp/luigi_test/task3_{self.param3}")


class Task4(luigi.Task):
    task_namespace = "examples"
    param1 = luigi.Parameter()
    param2 = luigi.Parameter()
    param3 = luigi.Parameter()

    def run(self):
        print(f"Running {self.__class__.__name__}.")
        self.output().open("w").close()

    def requires(self):
        yield Task2(param1=self.param1, param2=self.param2)
        yield Task3(param1=self.param1, param3=self.param3)

    def output(self):
        return luigi.LocalTarget(f"/tmp/luigi_test/task4_{self.param1}_{self.param2}")


class Task5(luigi.Task):
    task_namespace = "examples"

    def requires(self):
        for p in range(5):
            yield Task4(param1=0, param2=p, param3=p ** 2)

ただ依存元に渡したいだけのパラメータを、直接使わないTaskでもいちいち宣言せねばならず、単純にコード量が増えるだけでなく、パラメータの受け渡し間違いが発生するバグの温床となる恐れがあります。

解決方法

luigi.utils@inherits@requiresを利用します。

より安全なコード例

# -*- coding: utf-8 -*-
import luigi
from luigi.util import inherits, requires


class Task1(luigi.Task):
    task_namespace = "examples"
    param1 = luigi.Parameter()
    is_done = False

    def run(self):
        print(f"Running{self.__class__.__name__} with {self.param1}")
        self.is_done = True

    def complete(self):
        return self.is_done


@requires(Task1)
class Task2(luigi.Task):
    task_namespace = "examples"
    param2 = luigi.Parameter()

    def run(self):
        print(f"Running {self.__class__.__name__} with {self.param2}")
        self.output().open("w").close()

    def output(self):
        return luigi.LocalTarget(f"/tmp/luigi_test/task2_{self.param2}")


@requires(Task1)
class Task3(luigi.Task):
    task_namespace = "examples"
    param3 = luigi.Parameter()

    def run(self):
        print(f"Running {self.__class__.__name__} with {self.param3}")
        self.output().open("w").close()

    def output(self):
        return luigi.LocalTarget(f"/tmp/luigi_test/task3_{self.param3}")


@inherits(Task2)
@inherits(Task3)
class Task4(luigi.Task):
    task_namespace = "examples"

    def run(self):
        print(f"Running {self.__class__.__name__}.")
        self.output().open("w").close()

    def requires(self):
        yield self.clone(Task2)
        yield self.clone(Task3)

    def output(self):
        return luigi.LocalTarget(f"/tmp/luigi_test/task4_{self.param1}_{self.param2}")


class Task5(luigi.Task):
    task_namespace = "examples"

    def requires(self):
        for p in range(5):
            yield Task4(param1=0, param2=p, param3=p ** 2)

@requiresを呼ぶことで、自身が利用するParameterのみを宣言すればよくなり、かつ、def requires(self):をOverrideする必要がなくなり、type量も減ります。渡すべきパラメータはdecorator側でよしなにしてくれるので、間違いも減ります。ただ、Task2とTask3の両方に依存したTask4のようなケースでは@requiresを利用することができませんが、その代わり、@inheritsを重ねがけすることで、同様なことが実現できます。

この複数のTaskに@requiresを適用するためのPRが存在するのですが、2年近く放置されているので、時間ができたらPR出し直してみます!

参考

  1. requiresとinheritsの使い方
  2. inheritsの複数使用
  3. 複数requiresのPR
14
11
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
14
11

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?