Python

Pythonでパイプライン処理をする

More than 1 year has passed since last update.

はじめに

色々な場面でデータの処理において、依存関係をもたせた処理を書きたいときがあったりします。Pythonではそのような処理を行えるライブラリがあります。

Luigiについて

Pythonでパイプライン処理をするための便利なライブラリーです。
こちらはSpotifyが開発しているパイプライン処理ライブラリーです。
簡単に依存関係を作ることができ、かつそれを可視化して状態を見ることもできます。
Document: https://luigi.readthedocs.io/en/stable/
Gitリポジトリ:https://github.com/spotify/luigi
スライド:https://www.slideshare.net/EliasFreider/luigi-pydata-presentation

Luigiのインストール方法

python
$ pip install luigi

Luigiの使い方

基本的な構造

luigiの基本的な動作としては、タスクが実行されると設定した依存ファイルを吐き出します。その依存ファイルをみて、実行済みかどうかを判断します。例えば途中で例外が発生して、落ちた場合、依存ファイルが出力されていないところから始まるため、再実行も容易になっています。全てを再実行する場合は、依存ファイルを消せば全て再実行できます。

  • requires:依存するクラス名を記述します
  • output:依存ファイルの出力先を設定します
  • run:タスクの内容を記述します
sample
import luigi

class XXXX(luigi.Task):
    def requires(self):
        # ここに依存したいタスクのクラス名を記入
        return []

    def output(self):
        # ここでは依存管理するためのファイルの出力先を設定します
        return luigi.LocalTarget(file_path)

    def run(self):

        # ここに実行したい処理を記述します

        with self.output().open('w') as out_file:
            # 依存関係ファイルの出力処理
            out_file.write("")


if __name__ == '__main__':
    # luigiの実行
    luigi.run()

実行の仕方

luigidサーバの実行

$ luigid

luigiの実行

引数として、最後に動かしたいタスククラス名を書き、必要ならばプロセス数のコントロールもできます。

console
$ python xxxx.py 最後に動かしたいタスククラス名 --workers プロセス数

サンプル1

下図のような依存関係を作ります。TaskBはTaskAが終わってからでないと実行できないようにします。
スクリーンショット 2017-11-20 17.48.41.png

sample1.pyの作成

sample1.py
import luigi
import time


class TaskA(luigi.Task):
    """
    TaskA class
    """
    def requires(self):
        return []

    def output(self):
        return luigi.LocalTarget('tmp/sample1/TaskA')

    def run(self):

        time.sleep(10)

        with self.output().open('w') as out_file:
            out_file.write("")


class TaskB(luigi.Task):
    """
    TaskB class
    """

    def requires(self):
        return [TaskA(), ]

    def output(self):
        return luigi.LocalTarget('tmp/sample1/TaskB')

    def run(self):
        time.sleep(10)

        with self.output().open('w') as out_file:
            out_file.write("")


if __name__ == '__main__':
    luigi.run()

sample1.pyの実行

luigidサーバの実行

$ luigid

luigiの実行

$ python sample1.py TaskB

サンプル2

次は一つのタスクを二つのタスクが依存しているパターンです。
TaskCとTaskDはTaskBが終わるまで実行できません。

スクリーンショット 2017-11-20 17.55.33.png

sample2.pyの作成

sample2.py
import luigi
import time


class TaskA(luigi.Task):
    """
    TaskA class
    """
    def requires(self):
        return []

    def output(self):
        return luigi.LocalTarget('tmp/sample2/TaskA')

    def run(self):

        time.sleep(10)

        with self.output().open('w') as out_file:
            out_file.write("")


class TaskB(luigi.Task):
    """
    TaskB class
    """

    def requires(self):
        return [TaskA(), ]

    def output(self):
        return luigi.LocalTarget('tmp/sample2/TaskB')

    def run(self):
        time.sleep(10)

        with self.output().open('w') as out_file:
            out_file.write("")


class TaskC(luigi.Task):
    """
    TaskC class
    """

    def requires(self):
        return [TaskB(), ]

    def output(self):
        return luigi.LocalTarget('tmp/sample2/TaskC')

    def run(self):
        time.sleep(10)

        with self.output().open('w') as out_file:
            out_file.write("")


class TaskD(luigi.Task):
    """
    TaskD class
    """

    def requires(self):
        return [TaskB(), ]

    def output(self):
        return luigi.LocalTarget('tmp/sample2/TaskD')

    def run(self):
        time.sleep(10)

        with self.output().open('w') as out_file:
            out_file.write("")


class TaskEnd(luigi.Task):
    """
    TaskEnd class
    """
    def requires(self):
        return [TaskC(), TaskD(), ]

    def output(self):
        return luigi.LocalTarget('tmp/sample2/TaskEnd')

    def run(self):
        time.sleep(10)

        with self.output().open('w') as out_file:
            out_file.write("")

if __name__ == '__main__':
    luigi.run()

sample2.pyの実行

luigidサーバの実行

$ luigid

luigiの実行

$ python sample2.py TaskEnd

サンプル3

最後は同じタスクを引数ごとに処理を変えたいパターンです。例えば複数のサーバにファイルをコピーしたいときなどに便利です。
スクリーンショット 2017-11-20 18.01.41.png

sample3.pyの作成

sample3.py
import luigi
import time


class TaskA(luigi.Task):
    """
    TaskA class
    """
    def requires(self):
        return []

    def output(self):
        return luigi.LocalTarget('tmp/sample3/TaskA')

    def run(self):

        time.sleep(10)

        with self.output().open('w') as out_file:
            out_file.write("")


class TaskB(luigi.Task):
    """
    TaskB class
    """
    param = luigi.Parameter()

    def requires(self):
        return [TaskA(), ]

    def output(self):
        return luigi.LocalTarget('tmp/sample3/TaskB')

    def run(self):
        print(self.param)
        time.sleep(10)

        with self.output().open('w') as out_file:
            out_file.write("")


class TaskC(luigi.Task):
    """
    TaskC class
    """

    def requires(self):
        for i in ['serverA', 'serverB', 'serverC', 'serverD']:
            yield TaskB(i)

    def output(self):
        return luigi.LocalTarget('tmp/sample3/TaskC')

    def run(self):
        time.sleep(10)

        with self.output().open('w') as out_file:
            out_file.write("")


class TaskEnd(luigi.Task):
    """
    TaskEnd class
    """
    def requires(self):
        return [TaskC(), ]

    def output(self):
        return luigi.LocalTarget('tmp/sample3/TaskEnd')

    def run(self):
        time.sleep(10)

        with self.output().open('w') as out_file:
            out_file.write("")


if __name__ == '__main__':
    luigi.run()

sample3.pyの実行

luigidサーバの実行

$ luigid

luigiの実行

$ python sample3.py TaskEnd