この記事について
この記事では、 Python 上で動作するワークフロー管理フレームワーク Luigi をデータ分析に利用した際の経験を踏まえて、このようにワークフローを書けばメンテナンスが楽になるのではないか? と考えたやり方を述べています。
以下で述べていくことを不適当だと考えられる方もいらっしゃるかもしれませんが、そもそもこういったことがあまり議論されていないように見受けらますので、議論の種にでもなればと考えています。
Luigi が如何なるツールか、どういった利点があるのか、基本的な使い方はどういったものなのか、といったことについては触れません。Qiita の中でも既に記事を書かれている方がいらっしゃいますので、そちらをご参照ください(データフロー制御フレームワークLuigiを使ってビッグデータ解析をする、Luigi によるワークフロー管理など)。
Parameter の使い方
タスクの動作制御に不可欠な Parameter の扱いに関してです。
Parameter として定義したもの以外のインスタンス変数は使わない
当然のことですが、Parameter を利用しなかった場合、異なる設定で走らせたいタスクを異なるものとしてLuigiは認識してくれません。
また、ワークフローを読み込んだ際に Parameter 以外のインスタンス変数を設定するには __init__ メソッドを書く必要があり、記述が不要に煩雑になります。
タスクを継承してタスクを作成する場合、親クラスの Parameter を全てオーバーライドする
※ 2018-08-27 追記: この項および次の項で槍玉に挙げられているような状況では @requires
および @inherits
デコレータを用いる方が良いです。
https://luigi.readthedocs.io/en/stable/api/luigi.util.html
Luigi では、あるタスクを継承したタスクを依存タスクとしてインスタンス化する際、継承したクラス群でクラス変数に定義されている Parameter をすべて設定しないといけません。
例えば次のようにタスク群を定義したとします。
class Parent(luigi.ExternalTask):
hoge = luigi.Parameter()
class Child(Parent):
foo = luigi.Parameter()
# MissingParameterException が投げられる
class All1(luigi.WrapperTask):
def requires(self):
yield Child(foo="foo")
# 実行される
class All2(luigi.WrapperTask):
def requires(self):
yield Child(foo="foo", hoge="hoge")
上記の All2 は実行可能ですが、foo にしか値を設定せずに Child をインスタンス化しようとしている All1 は、実行しようとすると MissingParameterException が投げられます。 Child のクラス変数 foo だけでなく、Parent のクラス変数として定義されている hoge も設定しなければいけません。
であれば、以下のように何を設定する必要があるのか明示的になっている方が親切ではないかと考えられます。
class Child(Parent):
foo = luigi.Parameter()
hoge = luigi.Parameter()
煩雑な Parameter 群は dict にまとめて用いる
例えば、csv.writerを用いて出力するタスクで、csv.writerの動作をキーワード引数で変更した計算を流せるようにしたい、というような状況を考えます。このとき、csv.writerのキーワード引数を一つ一つ Parameter として持つよりも、下記のように一つの Parameter にまとめて保持しておく方が、より明快でかつ変更に対して柔軟です。
class OutputCSV(luigi.Task):
csv_writer_kwargs = luigi.Parameter(default=dict(sep='\t'))
...
def run(self):
with open(self.output().fn, 'wb') as file:
writer = csv.writer(file, **self.csv_writer_kwargs)
...
変更の可能性が高い依存タスクは依存タスクのクラス自体をパラメーターにする
例えば、TaskB と TaskC が TaskA に依存しているワークフローがあるとします。ここで、 TaskB や TaskC を別のタスクに対して使いまわすワークフローを作成する可能性が高いならば、次の例1よりは例2のような形にしておく方が良いでしょう。
class TaskA(luigi.ExternalTask):
param1 = luigi.Parameter()
...
class TaskB(luigi.Task):
def requires(self):
yield TaskA(param1="hoge")
...
class TaskC(luigi.Task):
def requires(self):
yield TaskA(param1="hoge")
...
class All(luigi.WrapperTask):
def requires(self):
yield TaskB()
yield TaskC()
class TaskA(luigi.ExternalTask):
param1 = luigi.Parameter()
...
class TaskB(luigi.Task):
required_task = luigi.Parameter(default=TaskA)
params = luigi.Parameter(default=dict(param1="hoge"))
def requires(self):
yield self.required_task(**self.params)
...
class TaskC(luigi.Task):
required_task = luigi.Parameter(default=TaskA)
params = luigi.Parameter(default=dict(param1="hoge"))
def requires(self):
yield self.required_task(**self.params)
...
class All(luigi.WrapperTask):
def requires(self):
yield TaskB()
yield TaskC()
ここで、TaskA2 の結果に対して TaskB と TaskC を実行するという処理を追加するとします。両者に必要な修正点を比較すると以下のようになります。
class TaskA(luigi.ExternalTask):
param1 = luigi.Parameter()
...
class TaskA2(luigi.ExternalTask):
param2 = luigi.Parameter()
...
class TaskB(luigi.Task):
def requires(self):
yield TaskA(param1="hoge")
...
class TaskC(luigi.Task):
def requires(self):
yield TaskA(param1="hoge")
...
class TaskB2(TaskB):
def requires(self):
yield TaskA2(param2="foo")
...
class TaskC2(TaskC):
def requires(self):
yield TaskA2(param2="foo")
...
class All(luigi.WrapperTask):
def requires(self):
yield TaskB()
yield TaskC()
yield TaskB2()
yield TaskC2()
class TaskA(luigi.ExternalTask):
param1 = luigi.Parameter()
...
class TaskA2(luigi.ExternalTask):
param2 = luigi.Parameter()
...
class TaskB(luigi.Task):
required_task = luigi.Parameter(default=TaskA)
params = luigi.Parameter(default=dict(param1="hoge"))
def requires(self):
yield self.required_task(**self.params)
...
class TaskC(luigi.Task):
required_task = luigi.Parameter(default=TaskA)
params = luigi.Parameter(default=dict(param1="hoge"))
def requires(self):
yield self.required_task(**self.params)
...
class All(luigi.WrapperTask):
def requires(self):
yield TaskB()
yield TaskC()
yield TaskB(
required_task=A2,
params=dict(param2="foo"))
yield TaskC(
required_task=A2,
params=dict(param2="foo"))
このように、例2のような形であれば All を書き換えるだけで良くなり、例1のように TaskB2 や TaskC2 のようなものは定義せずに済みます。
結果の整合性を保つための方法
以下で述べることはすべて complete メソッドに関わります。Task クラスの complete メソッドのデフォルト動作は、単に出力 Target の exists メソッドが True を返すかどうかです。整合性のある出力を得ることの優先度が高いのであれば、デフォルトの complete メソッドは用いるのではなく、下記の3点を守った complete メソッドを定義した方が良いでしょう。ただし、ワークフローが途中で止まったり再計算されたりする状況が増えますので、用途によっては不適当かもしれません。
complete では依存タスクの complete がすべて True を返すかをチェックする
ワークフローに含まれるタスク全てで依存タスクの complete メソッドを呼び出すように定義しておけば、ワークフローの末端のタスクの complete が呼ばれた時に、再帰的にワークフロー全体について complete が呼び出されます。そして、complete が False を返す箇所が見つかればそこから下流のタスクが全て実行されます。
ちなみに、WrapperTask の complete メソッドは正にこの「依存タスクの complete の返り値がすべて True であれば True を返す」という動作です。
complete メソッドでは入出力のタイムスタンプをチェックする
出力日時が入力日時よりも前であれば complete メソッドが False を返すようにしておきます。途中結果の一部を修正した後、必要な箇所全体に修正が反映されるようにするためです。
complete メソッドに計算結果の検証処理を入れる
ワークフローの途中でおかしな結果が出たらその時点で止めることが出来ます。また、結果が出ている場合にもワークフローを再実行することで検算を行うことが出来ます。