これなに?
Luigiのラッパーライブラリであるgokartについてのまとめ記事です。
開発の動機や、基本的な使い方はM3さんのブログにとても丁寧にまとまっており、基本的な使い方はここを読めばよいやんという話ですが、逆引きリファレンス的にまとめたかったので、記事にしました。
なお、Luigi本体の機能についてはあまり説明しません。
Luigiとは?
Spotifyが開発しているPipelineフレームワークのOSSの一種です。Pythonで実装されており、luigi.Task
を継承して
-
requires()
: 依存しているTask -
run()
: 実行する処理 -
output()
: 出力先
の3つのメソッドを記述するだけで、簡単にワークフローが作れます。
名前の由来は
Also it should be mentioned that Luigi is named after the pipeline-running friend of Super Mario.
とのことです
gokartとは?
そのLuigiをより使いやすくしたラッパーライブラリがgokartです。
名前の由来はたぶんマリオ(カート)からですね。
基本
以下、gokart==0.3.6
の機能についてまとめます。
Taskの構築
タスクを作成する際は、luigi.Task
ではなく、gokart.TaskOnKart
を継承して作成します。
import gokart
class TaskA(gokart.TaskOnKart):
def run(self):
data= pd.DataFrame(load_iris()['data'])
self.dump(data)
class TaskB(gokart.TaskOnKart):
def reuires(self):
return TaskA()
# outputは省略可能
def output(self):
return self.make_target('data.pkl')
def run(self):
df =self.load()
self.dump(df)
基本的な使い方はLuigiと変わりませんが、self.dump(保存したいobject)
とするだけでよいので、Luigi単体で同じ処理をするのと比べ、だいぶ簡略化できます。さらに、def output(self)
も省略でき、その場合はpickle
形式で保存されます。
実行
以下のようにして実行します。
gokart.run(['TaskB', '--local-scheduler'])
実行すると、以下のようにresources
配下にobjectが保存されます。
resources
├── data_3eba828ec57403111694b0fb1e3f62e0.pkl
└── log
├── module_versions
│ └── TaskB_3eba828ec57403111694b0fb1e3f62e0.txt
├── processing_time
│ └── TaskB_3eba828ec57403111694b0fb1e3f62e0.pkl
├── task_log
│ └── TaskB_3eba828ec57403111694b0fb1e3f62e0.pkl
└── task_params
└── TaskB_3eba828ec57403111694b0fb1e3f62e0.pkl
指定したファイル名にhash値がついて保存されてます。hash値はTaskのパラメータによって決まるので、**Luigi単体の場合と異なり、パラメータを変えるとTaskが再実行されます。**これもgokartのメリットの一つです。(後述しますが、hash値をつけないでの保存も可能です)
logには、
- 利用したモジュールのバージョン
- 処理にかかった時間
- loggerで出力されたlog
- 適用されたパラメータ
が保存されています。
保存先のPATHの指定
デフォルトだと、resources
配下に保存されますが、保存先のディレクトリは設定ファイルに
[TaskOnKart]
workspace_directory=./output
のように指定することで変更可能です。
load
DataFrameを結合してからload
self.load()
でももちろん、保存されたDataFrameをloadできますが、[df1, df2, df3...]
のようなDataFrameの集合をloadする場合、load_dataframe
を使用すると、複数のDataFrameが縦に結合された状態でloadできます。
また、オプションでset
でカラムを指定すると、そのカラムがloadするDataFrame
に存在しない場合、例外を発生させることもできます。
class TaskA(gokart.TaskOnKart):
def run(self):
df1 = pd.DataFrame([1,2], columns=['target'])
df2 = pd.DataFrame([3,4], columns=['target'])
df3 = pd.DataFrame([5,6], columns=['target'])
self.dump([df1, df2, df3])
class TaskB(gokart.TaskOnKart):
def requires(self):
return TaskA()
def run(self):
# concatされてからloadされる
df =self.load_data_frame(required_columns={'target'})
self.dump(df)
keyで指定してload
依存しているTaskが複数ある場合、以下のように辞書形式で依存するTaskを定義し、keyで読み込むことができます。Luigi単体でも複数のTaskをloadできますが、辞書の形はサポートしていないため、辞書形式を使うことで、コードの可読性を上げることができます。
class TrainModel(gokart.TaskOnKart):
def requires(self):
return {'data': LoadData(), 'target': LoadTarget()}
def run(self):
data = self.load('data')
target = self.load('target')
model = LogisticRegression()
model.fit(data, target)
self.dump(model)
逐次的にload
self.load_generator
を使うと、Taskを逐次的にloadして処理することができます。
from sklearn.datasets import load_iris
from sklearn.datasets import load_wine
class LoadWineData(gokart.TaskOnKart):
def run(self):
data = load_wine()['data']
self.dump(data)
class LoadIrisData(gokart.TaskOnKart):
def run(self):
data = load_iris()['data']
self.dump(data)
class LoadGenerator(gokart.TaskOnKart):
def requires(self):
return [LoadWineData(), LoadIrisData()]
def run(self):
for data in self.load_generator():
print(f'data_shape={data.shape}')
# data_shape=(178, 13)
# data_shape=(150, 4)
output
hash値を付けないで保存
use_unique_id=False
とすれば、ファイル名にhash値が付随しません。
def output(self):
return self.make_target('data.pkl', use_unique_id=False)
複数ファイルにまたがるモデルを保存
gensimやTensorFlowなど、モデルが複数ファイルにまたがって保存される形式の場合、以下のようにmake_model_target
を使うと、まとめて圧縮して保存できます。
def output(self):
return self.make_model_target(
'model.zip',
save_function=gensim.model.Word2Vec.save,
load_function=gensim.model.Word2Vec.load)
のように、パラメータとして、保存、復元用の関数を渡すことで、(今回の場合は)zip形式でモデルとload_function
がセットで圧縮されて保存され、呼び出し側のTaskは特に気にすることなく、self.load()
でモデルを復元できます。
巨大なDataFrameを保存
以下のようにmake_large_data_frame_target
を使うと、max_byte
で指定した容量ごとにDataFrame
がレコード単位で複数に分割され、一つに圧縮された後、保存されます。
def output(self):
return self.make_large_data_frame_target('large_df.zip', max_byte=2**10)
ちなみに内部では前述のmake_model_target
が使用されています。
DataFrameを様々な形式で保存
pickle
以外の形式でDataFrame
を変換した上で保存したい場合、その形式の拡張子をつけるだけで、内部のFileProcessor
により、対象の形式に変換された上で保存されます。
現在サポートしている形式は、
- pickle
- npz
- gz
- txt
- csv
- tsv
- json
- xml
です。
class LoadWineData(gokart.TaskOnKart):
def run(self):
data = load_wine()['data']
self.dump(data)
class ToCSV(gokart.TaskOnKart):
def requires(self):
return LoadWineData()
def output(self):
# suffixに保存したい拡張子を定義
return self.make_target('./wine.csv')
def run(self):
df = pd.DataFrame(self.load())
self.dump(df)
保存先をGCSやS3に指定
設定ファイルに記述したwork_space_directory
のpathの先頭がgs://
の場合はGCSに、s3://
の場合、S3にすべての出力結果がuploadされます.
[TaskOnKart]
# prefixをgs://またはs3://とすると、すべてのoutputがクラウド上に保存される
workspace_directory=gs://resources/
Luigi単体の場合と違いコードに一切手を入れずに変更できるため、非常に便利です。
その他
環境変数からParameterを指定
parameter=${環境変数}
と設定ファイルに記述することで、環境変数でTaskのParameterを指定できます。
テストと本番を切り分けたり、実行する環境ごとにクラウドに保存するか否かを変更したいときなど、非常に便利です。
- 設定ファイル
[TaskOnKart]
workspace_directory=${WORK_SPACE}
[feature.LoadTrainTask]
is_test=${IS_TEST}
- .zshrcとか
export IS_TEST=False
datetime=`date "+%m%d%H%Y"`
export WORK_SPACE="gs://data/output/${datetime}"
個人的にはGCEとかでガッツリ回す前に、ローカルで確認のためちょろっと回してみたいが、luigi.cfg
は共通のものを使いたいため、非常に重宝しています。
TaskのインスタンスをParameterとしてとる
gokart.(List)TaskInstanceParameter
を使うと、TaskのParameterとして、Taskをとれます。これにより、特定のTaskに依存しないTaskを作成することで、同じTaskを使い回すことができ、より柔軟なコードを記述できる可能性が広がります。
from sklearn.datasets import load_wine
from sklearn.linear_model import LogisticRegression
class LoadWineData(gokart.TaskOnKart):
def run(self):
data = load_wine()['data']
self.dump(data)
class LoadWineTarget(gokart.TaskOnKart):
def run(self):
target = load_wine()['target']
self.dump(target)
class Trainer(gokart.TaskOnKart):
# Taskを引数としてとる
data_task = gokart.TaskInstanceParameter(description='data for train')
target_task= gokart.TaskInstanceParameter(description='target for train')
def requires(self):
return {'data': self.data_task, 'target': self.target_task}
def run(self):
data = self.load('data')
target = self.load('target')
model = LogisticRegression()
model.fit(data, target)
self.dump(model)
class ExcuteTrain(gokart.TaskOnKart):
def requires(self):
# Taskを注入
return Trainer(data_task=LoadWineData(), target_task=LoadWineTarget())
def run(self):
trained_model = self.load()
self.dump(trained_model)
Slackに通知
設定ファイルに以下のように記述することで、slackに通知することが可能です。セキュリティの観点から、tokenはベタ書きでなく環境変数に定義したほうが良いでしょう
[SlackConfig]
token=${SLACK_TOKEN}
channel=channel_name
to_user=hase_hiro
最後に
挙動が異なる点があれば、ご指摘頂けると幸いです