Help us understand the problem. What is going on with this article?

Luigiで行うパイプライン処理をより円滑に!gokartについて紹介

これなに?

Luigiのラッパーライブラリであるgokartについてのまとめ記事です。

開発の動機や、基本的な使い方はM3さんのブログにとても丁寧にまとまっており、基本的な使い方はここを読めばよいやんという話ですが、逆引きリファレンス的にまとめたかったので、記事にしました。

なお、Luigi本体の機能についてはあまり説明しません。

Luigiとは?

Spotifyが開発しているPipelineフレームワークのOSSの一種です。Pythonで実装されており、luigi.Taskを継承して

  • requires(): 依存しているTask
  • run(): 実行する処理
  • output(): 出力先

の3つのメソッドを記述するだけで、簡単にワークフローが作れます。 

名前の由来は

Also it should be mentioned that Luigi is named after the pipeline-running friend of Super Mario.

とのことです

gokartとは?

そのLuigiをより使いやすくしたラッパーライブラリがgokartです。

名前の由来はたぶんマリオ(カート)からですね。

基本

以下、gokart==0.3.6の機能についてまとめます。

Taskの構築

タスクを作成する際は、luigi.Taskではなく、gokart.TaskOnKartを継承して作成します。

    import gokart

    class TaskA(gokart.TaskOnKart):
        def run(self):
            data= pd.DataFrame(load_iris()['data'])
            self.dump(data)

    class TaskB(gokart.TaskOnKart):
        def reuires(self):
             return TaskA()

            # outputは省略可能  
        def output(self):
            return self.make_target('data.pkl')

        def run(self):
            df =self.load()

            self.dump(df)

基本的な使い方はLuigiと変わりませんが、self.dump(保存したいobject)とするだけでよいので、Luigi単体で同じ処理をするのと比べ、だいぶ簡略化できます。さらに、def output(self)も省略でき、その場合はpickle形式で保存されます。

実行

以下のようにして実行します。

    gokart.run(['TaskB', '--local-scheduler'])

実行すると、以下のようにresources配下にobjectが保存されます。

    resources
    ├── data_3eba828ec57403111694b0fb1e3f62e0.pkl
    └── log
        ├── module_versions
        │   └── TaskB_3eba828ec57403111694b0fb1e3f62e0.txt
        ├── processing_time
        │   └── TaskB_3eba828ec57403111694b0fb1e3f62e0.pkl
        ├── task_log
        │   └── TaskB_3eba828ec57403111694b0fb1e3f62e0.pkl
        └── task_params
            └── TaskB_3eba828ec57403111694b0fb1e3f62e0.pkl

指定したファイル名にhash値がついて保存されてます。hash値はTaskのパラメータによって決まるので、Luigi単体の場合と異なり、パラメータを変えるとTaskが再実行されます。これもgokartのメリットの一つです。(後述しますが、hash値をつけないでの保存も可能です)

logには、

  • 利用したモジュールのバージョン
  • 処理にかかった時間
  • loggerで出力されたlog
  • 適用されたパラメータ

が保存されています。

保存先のPATHの指定

デフォルトだと、resources配下に保存されますが、保存先のディレクトリは設定ファイルに

    [TaskOnKart]
    workspace_directory=./output

のように指定することで変更可能です。

load

DataFrameを結合してからload

self.load()でももちろん、保存されたDataFrameをloadできますが、[df1, df2, df3...]のようなDataFrameの集合をloadする場合、load_dataframeを使用すると、複数のDataFrameが縦に結合された状態でloadできます。

また、オプションでsetでカラムを指定すると、そのカラムがloadするDataFrameに存在しない場合、例外を発生させることもできます。

    class TaskA(gokart.TaskOnKart):
        def run(self):
            df1 = pd.DataFrame([1,2], columns=['target'])
            df2 = pd.DataFrame([3,4], columns=['target'])
            df3 = pd.DataFrame([5,6], columns=['target'])
            self.dump([df1, df2, df3])

    class TaskB(gokart.TaskOnKart):
            def requires(self):
                return TaskA()

        def run(self):
                    # concatされてからloadされる
            df =self.load_data_frame(required_columns={'target'})
            self.dump(df)

keyで指定してload

依存しているTaskが複数ある場合、以下のように辞書形式で依存するTaskを定義し、keyで読み込むことができます。Luigi単体でも複数のTaskをloadできますが、辞書の形はサポートしていないため、辞書形式を使うことで、コードの可読性を上げることができます。

    class TrainModel(gokart.TaskOnKart):
        def requires(self):
            return {'data': LoadData(), 'target': LoadTarget()}

        def run(self):
            data = self.load('data')
            target = self.load('target')

            model = LogisticRegression()
            model.fit(data, target)

            self.dump(model)

逐次的にload

self.load_generatorを使うと、Taskを逐次的にloadして処理することができます。

    from sklearn.datasets import load_iris
    from sklearn.datasets import load_wine


    class LoadWineData(gokart.TaskOnKart):
        def run(self):
            data = load_wine()['data']

            self.dump(data)

    class LoadIrisData(gokart.TaskOnKart):
        def run(self):
            data = load_iris()['data']

            self.dump(data)

    class LoadGenerator(gokart.TaskOnKart):
        def requires(self):
            return [LoadWineData(), LoadIrisData()]

        def run(self):
            for data in self.load_generator():
                print(f'data_shape={data.shape}')
                # data_shape=(178, 13)
                # data_shape=(150, 4)

output

hash値を付けないで保存

use_unique_id=Falseとすれば、ファイル名にhash値が付随しません。

    def output(self):
            return self.make_target('data.pkl', use_unique_id=False)

複数ファイルにまたがるモデルを保存

gensimやTensorFlowなど、モデルが複数ファイルにまたがって保存される形式の場合、以下のようにmake_model_targetを使うと、まとめて圧縮して保存できます。

    def output(self):
            return self.make_model_target(
                'model.zip', 
                save_function=gensim.model.Word2Vec.save,
                load_function=gensim.model.Word2Vec.load)

のように、パラメータとして、保存、復元用の関数を渡すことで、(今回の場合は)zip形式でモデルとload_functionがセットで圧縮されて保存され、呼び出し側のTaskは特に気にすることなく、self.load()でモデルを復元できます。

巨大なDataFrameを保存

以下のようにmake_large_data_frame_targetを使うと、max_byteで指定した容量ごとにDataFrameがレコード単位で複数に分割され、一つに圧縮された後、保存されます。

    def output(self):
            return self.make_large_data_frame_target('large_df.zip', max_byte=2**10)

ちなみに内部では前述のmake_model_targetが使用されています。

DataFrameを様々な形式で保存

pickle以外の形式でDataFrameを変換した上で保存したい場合、その形式の拡張子をつけるだけで、内部のFileProcessorにより、対象の形式に変換された上で保存されます。

現在サポートしている形式は、

    - pickle
    - npz
    - gz
    - txt
    - csv
    - tsv
    - json
    - xml

です。

    class LoadWineData(gokart.TaskOnKart):
        def run(self):
            data = load_wine()['data']

            self.dump(data)

    class ToCSV(gokart.TaskOnKart):
        def requires(self):
            return LoadWineData()

        def output(self):
            # suffixに保存したい拡張子を定義
            return self.make_target('./wine.csv')

        def run(self):
            df = pd.DataFrame(self.load())
            self.dump(df)

保存先をGCSやS3に指定

設定ファイルに記述したwork_space_directoryのpathの先頭がgs://の場合はGCSに、s3://の場合、S3にすべての出力結果がuploadされます.

    [TaskOnKart]
    # prefixをgs://またはs3://とすると、すべてのoutputがクラウド上に保存される
    workspace_directory=gs://resources/

Luigi単体の場合と違いコードに一切手を入れずに変更できるため、非常に便利です。

その他

環境変数からParameterを指定

parameter=${環境変数}と設定ファイルに記述することで、環境変数でTaskのParameterを指定できます。

テストと本番を切り分けたり、実行する環境ごとにクラウドに保存するか否かを変更したいときなど、非常に便利です。

  • 設定ファイル
    [TaskOnKart]
    workspace_directory=${WORK_SPACE}

    [feature.LoadTrainTask]
    is_test=${IS_TEST}
  • .zshrcとか
    export IS_TEST=False
    datetime=`date "+%m%d%H%Y"`
    export WORK_SPACE="gs://data/output/${datetime}"

個人的にはGCEとかでガッツリ回す前に、ローカルで確認のためちょろっと回してみたいが、luigi.cfgは共通のものを使いたいため、非常に重宝しています。

TaskのインスタンスをParameterとしてとる

gokart.(List)TaskInstanceParameterを使うと、TaskのParameterとして、Taskをとれます。これにより、特定のTaskに依存しないTaskを作成することで、同じTaskを使い回すことができ、より柔軟なコードを記述できる可能性が広がります。

    from sklearn.datasets import  load_wine
    from sklearn.linear_model import LogisticRegression


    class LoadWineData(gokart.TaskOnKart):
        def run(self):
            data = load_wine()['data']

            self.dump(data)

    class LoadWineTarget(gokart.TaskOnKart):
        def run(self):
            target = load_wine()['target']

            self.dump(target)


    class Trainer(gokart.TaskOnKart):
        # Taskを引数としてとる
        data_task = gokart.TaskInstanceParameter(description='data for train')
        target_task= gokart.TaskInstanceParameter(description='target for train')

        def requires(self):
            return {'data': self.data_task, 'target': self.target_task}

        def run(self):
            data = self.load('data')
            target = self.load('target')

            model = LogisticRegression()
            model.fit(data, target)

            self.dump(model)


    class ExcuteTrain(gokart.TaskOnKart):
        def requires(self):
            # Taskを注入
            return Trainer(data_task=LoadWineData(), target_task=LoadWineTarget())

        def run(self):
            trained_model = self.load()

            self.dump(trained_model)

Slackに通知

設定ファイルに以下のように記述することで、slackに通知することが可能です。セキュリティの観点から、tokenはベタ書きでなく環境変数に定義したほうが良いでしょう

    [SlackConfig]
    token=${SLACK_TOKEN}    
    channel=channel_name
    to_user=hase_hiro

最後に

挙動が異なる点があれば、ご指摘頂けると幸いです

Hase8388
pythonとScalaで広告システムを開発してます。
fringe81
Fringeは、最新のテクノロジーとプロフェッショナルによるサービスにより、社会課題に仮説を立てて市場に広げていくことで、数十年という長期的なスパンで価値を生み出し続け、より良い世界を創る集団です。 既存の領域に限らず、時流を読み、仮説を生み出し、テクノロジーの力で優れたサービスを生み出し続けます。
https://www.fringe81.com/
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
No comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
ユーザーは見つかりませんでした