12
6

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Python, LuigiでPipeline管理の基本を学ぶ

Last updated at Posted at 2020-06-26

Luigi-readthedocs.io
Luigi-SlideShare公式

導入

Luigi-GitHub公式にはこう書いてあります。

原文:The purpose of Luigi is to address all the plumbing typically associated with long-running batch processes.
雑訳:ルイージの目的は全ての配管-特に長大にわたるバッチプロセスに関係する配管-を提供することです

要は様々なシステムで相互に依存関係のあるプロセス/バッチタスクを繋げる配管を作り、バラバラの物をLuigiのPipelineで繋げて管理しようと言うことだと思います。

機械学習の過程で(そうでなくても多数の人にサービスを提供する場合)頻繁に起きるデータ集計処理、日次、週次のバッチプロセスなどの場合、コードは膨大になり、例えば何かのプロセスにバグがあり切り替えた時に、他のプロセスへの影響などを考え管理するのは大変です。気が狂いそうになります。

私自身はずっとチーム一人で機械学習プロジェクトを回してたので、
JupyterNotebookを最初から全部Runさせることで対応していたのですが、
そうすると更新していない無駄な部分も回すことになります。
それに対し、PipeLineですとバックフォワード的に依存関係を確認し、
更新したタスク以降しか実行しないで済むので、効率的です。

Luigi-GitHub公式:Who uses luigiをみると、ユーザーとして例えば以下の企業/団体がいます。

・Spotify(開発企業なので当然)
・Groupon
・TreasureData
・Leipzig University
・Dow Jones
・Redhat

これは紹介されている全体の5%程度であり、広範な業界/団体で利用されている感じです。

#参考にしたページ

データフロー制御フレームワークLuigiを使ってビッグデータ解析をする

上の記事で名前の由来を推測してます。

名前のLuigiの由来は、データフローを配水管に例え、「世界で2番目に有名な緑色の服を身にまとった配管工」だとか…。赤じゃなくて緑なのは、Spotifyのコーポレートカラーと同じだからでしょうか(笑)。

まぁ、使う側としてはなんでもいいです。

書き方

データフロー制御フレームワークLuigiを使ってビッグデータ解析をする#書き方をみていただくのが一番わかりいいです。
ざっくりとだけ話すと、

・最小の実行単位はTask
・TaskはClassとして定義される
・Task間の依存関係、変数引き継ぎはClass内の関数として定義される

です。以下に最小単位の例を貼り付けておきますね。

Pythonでパイプライン管理#基本的な構造
公式GitHubのサンプルファイル top_artists.py
の2つを参考にして自分なりの理解で書いています。

minimum.py
import luigi
class TaskX(luigi.Task):
    """
    TaskX class
    """
    def requires(self):
        return []         
        # 先行するタスクがない場合は空のリストのまま、ある場合はタスク名記載

    def output(self):
        return luigi.LocalTarget(<path_to_managed_file>)
        # 各タスクの依存関係などを管理するマネージメントファイルを指定

    def run(self):
        # 実行したい処理をここに書きます。

if __name__ == '__main__':
    # luigiの実行
    luigi.run()

Sample1

Pythonでパイプライン処理するを参考に書き換えています。併せて参考にしていただければと思います。

コード

sample1.py
import os
import luigi

fileA = './tmp/Sample1_TaskA.txt'
fileB = './tmp/Sample1_TaskB.txt'
textA = 'TaskA SUCCEEDED!!'

path = './tmp/'
os.makedirs(path)

fileB = 'Sample1_TaskB.txt'
textA = 'TaskA SUCCEEDED!!'

class TaskA(luigi.Task):
    """
    TaskA class
    """
    def requires(self):
        return []         # In case of no pre-running task, an empty list.

    def output(self):
        return luigi.LocalTarget('tmp/sample1/TaskA')  # Where the managiment file is.

    def run(self):
        with open('./tmp/Sample1_TaskA.txt', 'w') as f:
            f.write(textA)

        with self.output().open('w') as out_file:
            out_file.write("")  # out_file 

class TaskB(luigi.Task):
    """
    TaskB class
    """

    def requires(self):
        return [TaskA(), ]  # TaskB depends on TaskA.

    def output(self):
        return luigi.LocalTarget('./tmp/sample1/TaskB') # The managiment file is in sample1

    def run(self):
        textB = 'TaskB, SUCCEEDED!! \n'
        with open(path+fileB, 'w') as f:
            f.write(textB)
            f.write(textA)

        with self.output().open('w') as out_file:
            out_file.write("") # out_file

if __name__ == '__main__':
    luigi.run()

TaskA->TaskBの順番で実行し、
TaskAでは'Sample_TaskA.txt'を新規作成し**TaskA SUCCEEDED!!と記入します。
TaskBでは'Sample_TaskB.txt'を新規作成し
TaskB SUCCEEDED!!と記入して、
次の行に
TaskB SUCCEEDED!!**と入力します。

実行

手順

  1. terminal上でluigidを入力しLuigiサーバーを起動 (localhost:8082)
  2. terminal上で python sample1.py TaskX として起動したいTaskXまでを起動。

以上

TaskAだけ実行してみる

terminal実行結果
>$luigid
2020-04-29 09:25:03,270 luigi[4129] INFO: logging configured by default settings
2020-04-29 09:25:03,274 luigi.scheduler[4129] INFO: No prior state file exists at /var/lib/luigi-server/state.pickle. Starting with empty state
2020-04-29 09:25:03,278 luigi.server[4129] INFO: Scheduler starting up

サーバーが走ったよとメッセージが出てきます。ここで出てくるSchedulerについては後ほど別エントリで書きます。

ブラウザ上でlocalhost:8082を入力すると以下の画面が出てきます。
まだsample1.pyを起動していないからか全てのカテゴリが'?'になってます

スクリーンショット 2020-04-29 9.26.39.png
sample1.py実行結果
$python sample1.py TaskA
DEBUG: Checking if TaskA() is complete
INFO: Informed scheduler that task   TaskA__99914b932b   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 4139] Worker Worker(salt=691810635, workers=1, host=user_name-MacBook-Pro.local, username=user_name, pid=4139) running   TaskA()
INFO: [pid 4139] Worker Worker(salt=691810635, workers=1, host=user_name-MacBook-Pro.local, username=user_name, pid=4139) done      TaskA()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   TaskA__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=xxxxxxxxxx, workers=1, host=users-MacBook-Pro.local, username=user_name, pid=xxxx) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 ran successfully:
    - 1 TaskA()

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

大事なところは ====Luigi Execusion Summary==== 以下の部分ですね。
ユニークなのが This progress looks :) と結果が顔文字で表示されているところ。
失敗すると **:(**となります。

実行結果を見てみましょう。

$ vim tmp/Sample1_TaskA.txt
スクリーンショット 2020-04-29 9.34.53.png

大丈夫そうですね。

続けてTaskBを実行してみる

$ python sample1.py TaskB
実行結果エラー
$ python sample1.py TaskB
Traceback (most recent call last):
  File "sample1.py", line 9, in <module>
    os.makedirs(path)
  File "/Users/user_name/././lib/python3.7/os.py", line 221, in makedirs
    mkdir(name, mode)
FileExistsError: [Errno 17] File exists: './tmp/'

エラーが出ました。ちゃんとtmpフォルダを削除しましょう。

実行結果
$ python sample1.py TaskB
(略)
===== Luigi Execution Summary =====

Scheduled 2 tasks of which:
* 2 ran successfully:
    - 1 TaskA()
    - 1 TaskB()

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

先ほどのTaskAに加えてTaskBも実行できているのがわかります。
結果も画像は載せませんが期待した通りの出力になっています。

TaskAが成功した状態でTaskBまでを実行してみる

先ほどと同じことをやろうとしています。
先ほどはtmpファイルの問題でエラーが出たので、
今度はtmpファイルはデフォルトで作成しておいて、
makedirsをコメントアウトして実行しました。

sample1.pyの一部
path = './tmp/'
#os.makedirs(path)  <- コメントアウト
実行
$ python sample1.py TaskA
(略)
$ python sample1.py TaskB
(略)
===== Luigi Execution Summary =====

Scheduled 2 tasks of which:
* 1 complete ones were encountered:
    - 1 TaskA()
* 1 ran successfully:
    - 1 TaskB()

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

先ほどの結果とは違いTaskAはもう完了しているよと言うメッセージが出て、
TaskBだけranしているのがわかりますでしょうか?
これが最初の方で話していた

更新したタスク以降しか実行しないで済むので、効率的です。

にあたります。ここでは厳密には更新ではないのですが、
実行する必要のないTaskは実行しないと言う仕組みがわかるかと思います。

sample1.pyでわかったこと

いろいろなレベル感を混ぜて書きますが、

・pythonに慣れている人なら特に苦もなく書けそう
・Classの外でGlobal変数を用意すればそのままTaskで利用できる
・既に完了しているタスクは再実行しないでその続きから実行する
・Classの中で変数を定義した場合、他のClassにはそのまま引き継がれない
・サーバーを起動させない状態で.pyファイルだけ実行するとエラーが出る

後ろの2つは当然といえば当然ですね。
4つ目の変数引き継ぎについては StackOverFlowPassing Python objects between Tasks in Luigi?公式Doc:Parametersにあるように、
Task間で渡せるものに機械学習の予測結果などは入っていないようです(確信はないが確度は高い)。
そういった事が発生する場合はpickleなどで掃き出して、別タスクで引用する必要がありそうです。

AirFlowよりはお手軽な感じがするので、現状はこちらを使ってみようかなと思います。

おまけ

以下にはいろいろ試した結果があります。
全部俺:AdventCalender

以下には仕事上の便利なノウハウがある感じです。
Luigi で作成するワークフローの再利用性・メンテナンス性を高めるために

12
6
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
12
6

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?