はじめに
色々な場面でデータの処理において、依存関係をもたせた処理を書きたいときがあったりします。Pythonではそのような処理を行えるライブラリがあります。
Luigiについて
Pythonでパイプライン処理をするための便利なライブラリーです。
こちらはSpotifyが開発しているパイプライン処理ライブラリーです。
簡単に依存関係を作ることができ、かつそれを可視化して状態を見ることもできます。
Document: https://luigi.readthedocs.io/en/stable/
Gitリポジトリ:https://github.com/spotify/luigi
スライド:https://www.slideshare.net/EliasFreider/luigi-pydata-presentation
Luigiのインストール方法
$ pip install luigi
Luigiの使い方
基本的な構造
luigiの基本的な動作としては、タスクが実行されると設定した依存ファイルを吐き出します。その依存ファイルをみて、実行済みかどうかを判断します。例えば途中で例外が発生して、落ちた場合、依存ファイルが出力されていないところから始まるため、再実行も容易になっています。全てを再実行する場合は、依存ファイルを消せば全て再実行できます。
- requires:依存するクラス名を記述します
- output:依存ファイルの出力先を設定します
- run:タスクの内容を記述します
import luigi
class XXXX(luigi.Task):
def requires(self):
# ここに依存したいタスクのクラス名を記入
return []
def output(self):
# ここでは依存管理するためのファイルの出力先を設定します
return luigi.LocalTarget(file_path)
def run(self):
# ここに実行したい処理を記述します
with self.output().open('w') as out_file:
# 依存関係ファイルの出力処理
out_file.write("")
if __name__ == '__main__':
# luigiの実行
luigi.run()
実行の仕方
luigidサーバの実行
$ luigid
luigiの実行
引数として、最後に動かしたいタスククラス名を書き、必要ならばプロセス数のコントロールもできます。
$ python xxxx.py 最後に動かしたいタスククラス名 --workers プロセス数
サンプル1
下図のような依存関係を作ります。TaskBはTaskAが終わってからでないと実行できないようにします。
sample1.pyの作成
import luigi
import time
class TaskA(luigi.Task):
"""
TaskA class
"""
def requires(self):
return []
def output(self):
return luigi.LocalTarget('tmp/sample1/TaskA')
def run(self):
time.sleep(10)
with self.output().open('w') as out_file:
out_file.write("")
class TaskB(luigi.Task):
"""
TaskB class
"""
def requires(self):
return [TaskA(), ]
def output(self):
return luigi.LocalTarget('tmp/sample1/TaskB')
def run(self):
time.sleep(10)
with self.output().open('w') as out_file:
out_file.write("")
if __name__ == '__main__':
luigi.run()
sample1.pyの実行
luigidサーバの実行
$ luigid
luigiの実行
$ python sample1.py TaskB
サンプル2
次は一つのタスクを二つのタスクが依存しているパターンです。
TaskCとTaskDはTaskBが終わるまで実行できません。
sample2.pyの作成
import luigi
import time
class TaskA(luigi.Task):
"""
TaskA class
"""
def requires(self):
return []
def output(self):
return luigi.LocalTarget('tmp/sample2/TaskA')
def run(self):
time.sleep(10)
with self.output().open('w') as out_file:
out_file.write("")
class TaskB(luigi.Task):
"""
TaskB class
"""
def requires(self):
return [TaskA(), ]
def output(self):
return luigi.LocalTarget('tmp/sample2/TaskB')
def run(self):
time.sleep(10)
with self.output().open('w') as out_file:
out_file.write("")
class TaskC(luigi.Task):
"""
TaskC class
"""
def requires(self):
return [TaskB(), ]
def output(self):
return luigi.LocalTarget('tmp/sample2/TaskC')
def run(self):
time.sleep(10)
with self.output().open('w') as out_file:
out_file.write("")
class TaskD(luigi.Task):
"""
TaskD class
"""
def requires(self):
return [TaskB(), ]
def output(self):
return luigi.LocalTarget('tmp/sample2/TaskD')
def run(self):
time.sleep(10)
with self.output().open('w') as out_file:
out_file.write("")
class TaskEnd(luigi.Task):
"""
TaskEnd class
"""
def requires(self):
return [TaskC(), TaskD(), ]
def output(self):
return luigi.LocalTarget('tmp/sample2/TaskEnd')
def run(self):
time.sleep(10)
with self.output().open('w') as out_file:
out_file.write("")
if __name__ == '__main__':
luigi.run()
sample2.pyの実行
luigidサーバの実行
$ luigid
luigiの実行
$ python sample2.py TaskEnd
サンプル3
最後は同じタスクを引数ごとに処理を変えたいパターンです。例えば複数のサーバにファイルをコピーしたいときなどに便利です。
sample3.pyの作成
import luigi
import time
class TaskA(luigi.Task):
"""
TaskA class
"""
def requires(self):
return []
def output(self):
return luigi.LocalTarget('tmp/sample3/TaskA')
def run(self):
time.sleep(10)
with self.output().open('w') as out_file:
out_file.write("")
class TaskB(luigi.Task):
"""
TaskB class
"""
param = luigi.Parameter()
def requires(self):
return [TaskA(), ]
def output(self):
return luigi.LocalTarget('tmp/sample3/TaskB')
def run(self):
print(self.param)
time.sleep(10)
with self.output().open('w') as out_file:
out_file.write("")
class TaskC(luigi.Task):
"""
TaskC class
"""
def requires(self):
for i in ['serverA', 'serverB', 'serverC', 'serverD']:
yield TaskB(i)
def output(self):
return luigi.LocalTarget('tmp/sample3/TaskC')
def run(self):
time.sleep(10)
with self.output().open('w') as out_file:
out_file.write("")
class TaskEnd(luigi.Task):
"""
TaskEnd class
"""
def requires(self):
return [TaskC(), ]
def output(self):
return luigi.LocalTarget('tmp/sample3/TaskEnd')
def run(self):
time.sleep(10)
with self.output().open('w') as out_file:
out_file.write("")
if __name__ == '__main__':
luigi.run()
sample3.pyの実行
luigidサーバの実行
$ luigid
luigiの実行
$ python sample3.py TaskEnd