目的
Pythonでデータフローの依存関係をシンプルに記述できるライブラリであるLuigiにおいて、依存関係の根本にあるタスクにパラメータを受け渡す場合、パラメータのバケツリレーが発生してしまう現象を解決します。
実際に発生した問題
以下のDependency Graphのように、依存が途中で分岐したり、依存の途中で共通処理が存在していて、かつ、パラメータは一番上位のTaskが管理しているような処理を実現したいような状況があります。
その場合は、最も上位のTaskから該当Taskまでパラメータを受け渡ししないといけません。コードに落とすと例えば以下のようになります。
愚直なコード例
# -*- coding: utf-8 -*-
import luigi
class Task1(luigi.Task):
task_namespace = "examples"
param1 = luigi.Parameter()
is_done = False
def run(self):
print(f"Running{self.__class__.__name__} with {self.param1}")
self.is_done = True
def complete(self):
return self.is_done
class Task2(luigi.Task):
task_namespace = "examples"
param1 = luigi.Parameter()
param2 = luigi.Parameter()
def run(self):
print(f"Running {self.__class__.__name__} with {self.param2}")
self.output().open("w").close()
def requires(self):
yield Task1(param1=self.param1)
def output(self):
return luigi.LocalTarget(f"/tmp/luigi_test/task2_{self.param2}")
class Task3(luigi.Task):
task_namespace = "examples"
param1 = luigi.Parameter()
param3 = luigi.Parameter()
def run(self):
print(f"Running {self.__class__.__name__} with {self.param3}")
self.output().open("w").close()
def requires(self):
yield Task1(param1=self.param1)
def output(self):
return luigi.LocalTarget(f"/tmp/luigi_test/task3_{self.param3}")
class Task4(luigi.Task):
task_namespace = "examples"
param1 = luigi.Parameter()
param2 = luigi.Parameter()
param3 = luigi.Parameter()
def run(self):
print(f"Running {self.__class__.__name__}.")
self.output().open("w").close()
def requires(self):
yield Task2(param1=self.param1, param2=self.param2)
yield Task3(param1=self.param1, param3=self.param3)
def output(self):
return luigi.LocalTarget(f"/tmp/luigi_test/task4_{self.param1}_{self.param2}")
class Task5(luigi.Task):
task_namespace = "examples"
def requires(self):
for p in range(5):
yield Task4(param1=0, param2=p, param3=p ** 2)
ただ依存元に渡したいだけのパラメータを、直接使わないTaskでもいちいち宣言せねばならず、単純にコード量が増えるだけでなく、パラメータの受け渡し間違いが発生するバグの温床となる恐れがあります。
解決方法
luigi.utils
の@inherits
と@requires
を利用します。
より安全なコード例
# -*- coding: utf-8 -*-
import luigi
from luigi.util import inherits, requires
class Task1(luigi.Task):
task_namespace = "examples"
param1 = luigi.Parameter()
is_done = False
def run(self):
print(f"Running{self.__class__.__name__} with {self.param1}")
self.is_done = True
def complete(self):
return self.is_done
@requires(Task1)
class Task2(luigi.Task):
task_namespace = "examples"
param2 = luigi.Parameter()
def run(self):
print(f"Running {self.__class__.__name__} with {self.param2}")
self.output().open("w").close()
def output(self):
return luigi.LocalTarget(f"/tmp/luigi_test/task2_{self.param2}")
@requires(Task1)
class Task3(luigi.Task):
task_namespace = "examples"
param3 = luigi.Parameter()
def run(self):
print(f"Running {self.__class__.__name__} with {self.param3}")
self.output().open("w").close()
def output(self):
return luigi.LocalTarget(f"/tmp/luigi_test/task3_{self.param3}")
@inherits(Task2)
@inherits(Task3)
class Task4(luigi.Task):
task_namespace = "examples"
def run(self):
print(f"Running {self.__class__.__name__}.")
self.output().open("w").close()
def requires(self):
yield self.clone(Task2)
yield self.clone(Task3)
def output(self):
return luigi.LocalTarget(f"/tmp/luigi_test/task4_{self.param1}_{self.param2}")
class Task5(luigi.Task):
task_namespace = "examples"
def requires(self):
for p in range(5):
yield Task4(param1=0, param2=p, param3=p ** 2)
@requires
を呼ぶことで、自身が利用するParameterのみを宣言すればよくなり、かつ、def requires(self):
をOverrideする必要がなくなり、type量も減ります。渡すべきパラメータはdecorator側でよしなにしてくれるので、間違いも減ります。ただ、Task2とTask3の両方に依存したTask4のようなケースでは@requires
を利用することができませんが、その代わり、@inherits
を重ねがけすることで、同様なことが実現できます。
この複数のTaskに@requires
を適用するためのPRが存在するのですが、2年近く放置されているので、時間ができたらPR出し直してみます!