LoginSignup
13
14

More than 5 years have passed since last update.

Pythonでパイプライン処理をする

Last updated at Posted at 2017-11-21

はじめに

色々な場面でデータの処理において、依存関係をもたせた処理を書きたいときがあったりします。Pythonではそのような処理を行えるライブラリがあります。

Luigiについて

Pythonでパイプライン処理をするための便利なライブラリーです。
こちらはSpotifyが開発しているパイプライン処理ライブラリーです。
簡単に依存関係を作ることができ、かつそれを可視化して状態を見ることもできます。
Document: https://luigi.readthedocs.io/en/stable/
Gitリポジトリ:https://github.com/spotify/luigi
スライド:https://www.slideshare.net/EliasFreider/luigi-pydata-presentation

Luigiのインストール方法

python
$ pip install luigi

Luigiの使い方

基本的な構造

luigiの基本的な動作としては、タスクが実行されると設定した依存ファイルを吐き出します。その依存ファイルをみて、実行済みかどうかを判断します。例えば途中で例外が発生して、落ちた場合、依存ファイルが出力されていないところから始まるため、再実行も容易になっています。全てを再実行する場合は、依存ファイルを消せば全て再実行できます。

  • requires:依存するクラス名を記述します
  • output:依存ファイルの出力先を設定します
  • run:タスクの内容を記述します
sample
import luigi

class XXXX(luigi.Task):
    def requires(self):
        # ここに依存したいタスクのクラス名を記入
        return []

    def output(self):
        # ここでは依存管理するためのファイルの出力先を設定します
        return luigi.LocalTarget(file_path)

    def run(self):

        # ここに実行したい処理を記述します

        with self.output().open('w') as out_file:
            # 依存関係ファイルの出力処理
            out_file.write("")


if __name__ == '__main__':
    # luigiの実行
    luigi.run()

実行の仕方

luigidサーバの実行

$ luigid

luigiの実行

引数として、最後に動かしたいタスククラス名を書き、必要ならばプロセス数のコントロールもできます。

console
$ python xxxx.py 最後に動かしたいタスククラス名 --workers プロセス数

サンプル1

下図のような依存関係を作ります。TaskBはTaskAが終わってからでないと実行できないようにします。
スクリーンショット 2017-11-20 17.48.41.png

sample1.pyの作成

sample1.py
import luigi
import time


class TaskA(luigi.Task):
    """
    TaskA class
    """
    def requires(self):
        return []

    def output(self):
        return luigi.LocalTarget('tmp/sample1/TaskA')

    def run(self):

        time.sleep(10)

        with self.output().open('w') as out_file:
            out_file.write("")


class TaskB(luigi.Task):
    """
    TaskB class
    """

    def requires(self):
        return [TaskA(), ]

    def output(self):
        return luigi.LocalTarget('tmp/sample1/TaskB')

    def run(self):
        time.sleep(10)

        with self.output().open('w') as out_file:
            out_file.write("")


if __name__ == '__main__':
    luigi.run()

sample1.pyの実行

luigidサーバの実行

$ luigid

luigiの実行

$ python sample1.py TaskB

サンプル2

次は一つのタスクを二つのタスクが依存しているパターンです。
TaskCとTaskDはTaskBが終わるまで実行できません。

スクリーンショット 2017-11-20 17.55.33.png

sample2.pyの作成

sample2.py
import luigi
import time


class TaskA(luigi.Task):
    """
    TaskA class
    """
    def requires(self):
        return []

    def output(self):
        return luigi.LocalTarget('tmp/sample2/TaskA')

    def run(self):

        time.sleep(10)

        with self.output().open('w') as out_file:
            out_file.write("")


class TaskB(luigi.Task):
    """
    TaskB class
    """

    def requires(self):
        return [TaskA(), ]

    def output(self):
        return luigi.LocalTarget('tmp/sample2/TaskB')

    def run(self):
        time.sleep(10)

        with self.output().open('w') as out_file:
            out_file.write("")


class TaskC(luigi.Task):
    """
    TaskC class
    """

    def requires(self):
        return [TaskB(), ]

    def output(self):
        return luigi.LocalTarget('tmp/sample2/TaskC')

    def run(self):
        time.sleep(10)

        with self.output().open('w') as out_file:
            out_file.write("")


class TaskD(luigi.Task):
    """
    TaskD class
    """

    def requires(self):
        return [TaskB(), ]

    def output(self):
        return luigi.LocalTarget('tmp/sample2/TaskD')

    def run(self):
        time.sleep(10)

        with self.output().open('w') as out_file:
            out_file.write("")


class TaskEnd(luigi.Task):
    """
    TaskEnd class
    """
    def requires(self):
        return [TaskC(), TaskD(), ]

    def output(self):
        return luigi.LocalTarget('tmp/sample2/TaskEnd')

    def run(self):
        time.sleep(10)

        with self.output().open('w') as out_file:
            out_file.write("")

if __name__ == '__main__':
    luigi.run()

sample2.pyの実行

luigidサーバの実行

$ luigid

luigiの実行

$ python sample2.py TaskEnd

サンプル3

最後は同じタスクを引数ごとに処理を変えたいパターンです。例えば複数のサーバにファイルをコピーしたいときなどに便利です。
スクリーンショット 2017-11-20 18.01.41.png

sample3.pyの作成

sample3.py
import luigi
import time


class TaskA(luigi.Task):
    """
    TaskA class
    """
    def requires(self):
        return []

    def output(self):
        return luigi.LocalTarget('tmp/sample3/TaskA')

    def run(self):

        time.sleep(10)

        with self.output().open('w') as out_file:
            out_file.write("")


class TaskB(luigi.Task):
    """
    TaskB class
    """
    param = luigi.Parameter()

    def requires(self):
        return [TaskA(), ]

    def output(self):
        return luigi.LocalTarget('tmp/sample3/TaskB')

    def run(self):
        print(self.param)
        time.sleep(10)

        with self.output().open('w') as out_file:
            out_file.write("")


class TaskC(luigi.Task):
    """
    TaskC class
    """

    def requires(self):
        for i in ['serverA', 'serverB', 'serverC', 'serverD']:
            yield TaskB(i)

    def output(self):
        return luigi.LocalTarget('tmp/sample3/TaskC')

    def run(self):
        time.sleep(10)

        with self.output().open('w') as out_file:
            out_file.write("")


class TaskEnd(luigi.Task):
    """
    TaskEnd class
    """
    def requires(self):
        return [TaskC(), ]

    def output(self):
        return luigi.LocalTarget('tmp/sample3/TaskEnd')

    def run(self):
        time.sleep(10)

        with self.output().open('w') as out_file:
            out_file.write("")


if __name__ == '__main__':
    luigi.run()

sample3.pyの実行

luigidサーバの実行

$ luigid

luigiの実行

$ python sample3.py TaskEnd
13
14
2

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
13
14