gokart とは
gokart はエムスリーが開発している機械学習パイプラインツール。 Spotify により開発されている luigi のラッパーになっていてより簡単に書くことができる。
NLP の機械学習モデルを開発していると前処理、事前学習、ファインチューニング、可視化などなど工程が多く、管理が大変になる。パイプラインツールを使って楽になりたいということで、言語処理100本ノック2020 Rev2の機械学習パートで試してみる (56, 57, 59は gokart 的に新しい操作がないため飛ばす)。
公式情報として gokart は redshells などと組み合わせて使われることが多いようだが、この記事では gokart 自体の動作の理解のため、他のツールは使わずに実装する。
前準備
gokart がどんなものかまずは公式ドキュメントで動作を確かめてみると良い。
またこの記事の最後に参考となるサイトをまとめてあるので参照されたい。
実装
今回実装したコードは以下のリポジトリにある。
以下ではmain.pyから抜粋して説明するので実際は動かし方はリポジトリを参照されたい。最初は gokart の簡単な機能から始めて、徐々に新しい機能を使っていくような形式にした。「51. 特徴量抽出 (パイプラインを利用した特徴量作成)」 のように書いたときに、かっこ内が注目すべき gokart の機能になっている。
gokart はまだ触って2日目なので、より良い書き方があればご教示いただきたい。
処理概要
タスクは文書分類。記事のタイトルから文書のカテゴリを予測するモデルを作る。データのダウンロード、特徴量 (tf-idf) の作成、ロジスティック回帰で学習、予測という流れになる。
50. データの入手・整形 (初めてのタスク、パイプライン作成)
https://nlp100.github.io/ja/ch06.html#50-データの入手整形 に対応。
Zip ファイルのデータをダウンロードしてきて展開するコードをタスクとして記述すると以下のようになる。
class Step50DownloadDatasetTask(gokart.TaskOnKart):
def run(self):
url = 'https://archive.ics.uci.edu/ml/machine-learning-databases/00359/NewsAggregatorDataset.zip'
filename = 'NewsAggregatorDataset.zip'
if not os.path.exists(filename):
data = requests.get(url).content
with open(filename ,mode='wb') as f:
f.write(data)
outdirname = 'data/' + filename.replace('.zip', '')
shutil.unpack_archive(filename, outdirname)
colnames = ['ID', 'TITLE', 'URL', 'PUBLISHER', 'CATEGORY', 'STORY', 'HOSTNAME', 'TIMESTAMP']
df = pd.read_csv('data/NewsAggregatorDataset/newsCorpora.csv', header=None, names=colnames, sep='\t', index_col='ID')
self.dump(df)
特に依存するタスクはないので requires()
はなく、 run()
のみ実装している。 pd.DataFrame
を作り、 self.dump(df)
でダンプしている。こうすることで次のタスクで使うことができる。 output()
で保存形式を指定していないので、実態としては pickle として保存されている (./resources/__main__/Step50DownloadDatasetTask_970fe1008c560743d4978f557d70d4b3.pkl
)。
続いてデータを train, validation, test に分ける。
class Step50SplitDatasetTask(gokart.TaskOnKart):
def requires(self):
return Step50DownloadDatasetTask()
def run(self):
df = self.load()
# 50-2: 情報源(publisher)が”Reuters”, “Huffington Post”, “Businessweek”, “Contactmusic.com”, “Daily Mail”の事例(記事)のみを抽出する.
pub_list = ['Reuters','Huffington Post', 'Businessweek', 'Contactmusic.com', 'Daily Mail']
df = df[df['PUBLISHER'].isin(pub_list)]
seed = 12345
# 50-3, 50-4: scikit-learn の train_test_split では2つにしか分割できないため2回に分けて3つに分割する (https://datascience.stackexchange.com/a/15136/126697)
df_train, df_valid_test = train_test_split(df, test_size=0.2, random_state=seed)
df_valid, df_test = train_test_split(df_valid_test, test_size=0.5, random_state=seed)
# 50-4: それぞれtrain.txt,valid.txt,test.txtというファイル名で保存する
# 今回は使わないが一応保存しておく
df_train[['CATEGORY', 'TITLE']].to_csv('output/train.txt', header=None, index=None, sep='\t')
df_valid[['CATEGORY', 'TITLE']].to_csv('output/valid.txt', header=None, index=None, sep='\t')
df_test[['CATEGORY', 'TITLE']].to_csv('output/test.txt', header=None, index=None, sep='\t')
# 学習データと評価データを作成したら,各カテゴリの事例数を確認せよ.
print('Train Data')
print(df_train['CATEGORY'].value_counts())
print('Validation Data')
print(df_valid['CATEGORY'].value_counts())
print('Test Data')
print(df_test['CATEGORY'].value_counts())
self.dump((df_train, df_valid, df_test))
requires()
で Step50DownloadDatasetTask()
を指定し、 run()
の中で df = self.load()
のようにすると、 Step50DownloadDatasetTask()
で dump したものが読み込まれる。このようにしてパイプラインを作っていく。依存関係は gokart が面倒見てくれるため、上流のタスクを再実行した場合は下流のタスクも再実行されるようになる。
データを分割したあと、 self.dump((df_train, df_valid, df_test))
でそれぞれ dump して、次のタスクで使えるようにする。
なお分割するときに乱数のシードを固定しているが、恐らく gokart 側でも固定してくれるはず。
51. 特徴量抽出 (パイプラインを利用した特徴量作成)
https://nlp100.github.io/ja/ch06.html#51-特徴量抽出 に対応。
特徴量として今回は、記事タイトルから tf-idf を作ることにする。
class Step51ExtractFeatureTask(gokart.TaskOnKart):
def requires(self):
return Step50SplitDatasetTask()
def run(self):
df_train, df_valid, df_test = self.load()
# 10回以上出現する unigram, bi-gram について計算
vec_tfidf = TfidfVectorizer(min_df=10, ngram_range=(1, 2))
# valid は tf-idf を計算するための train data に含めても良いが、今回はやらない
tfidf_train = vec_tfidf.fit_transform(df_train['TITLE'])
tfidf_valid = vec_tfidf.transform(df_valid['TITLE'])
tfidf_test = vec_tfidf.transform(df_test['TITLE'])
# DataFrame に変換
df_train = pd.DataFrame(tfidf_train.toarray(), columns=vec_tfidf.get_feature_names_out())
df_valid = pd.DataFrame(tfidf_valid.toarray(), columns=vec_tfidf.get_feature_names_out())
df_test = pd.DataFrame(tfidf_test.toarray(), columns=vec_tfidf.get_feature_names_out())
# 今回は使用しないが一応保存
df_train.to_csv('output/train.feature.txt', index=None, sep='\t')
df_valid.to_csv('output/valid.feature.txt', index=None, sep='\t')
df_test.to_csv('output/test.feature.txt', index=None, sep='\t')
self.dump((df_train, df_valid, df_test))
同様に requires()
で Step50SplitDatasetTask()
を指定し、run()
中で df_train, df_valid, df_test = self.load()
とすることによって分割したデータをロードする。tf-idf を計算したあとはまた dump する。
52. 学習 (2つのタスクへの依存関係の記述)
https://nlp100.github.io/ja/ch06.html#52-学習 に対応。
class Step52TrainTask(gokart.TaskOnKart):
def requires(self):
return {'data': Step50SplitDatasetTask(), 'feature': Step51ExtractFeatureTask()}
def run(self):
df_train, _, _ = self.load('data')
X_train, _, _ = self.load('feature')
y_train = df_train['CATEGORY']
model = LogisticRegression(random_state=123, max_iter=10000)
model.fit(X_train, y_train)
self.dump(model)
Scikit-learn のロジスティック回帰で学習する。学習するには train データから正解ラベルを読み込み、さらに作成した特徴量も読み込む必要がある。そのため requires を辞書型で複数指定する (cf. https://gokart.readthedocs.io/en/latest/task_on_kart.html#taskonkart-load)。
ロードの部分は df_train, _, _ = self.load('data')
のようになる (validation, test は使わないので捨てている)。学習したモデルはいつも通り dump する (self.dump(model)
)。
53. 予測 (辞書型を使った dump)
https://nlp100.github.io/ja/ch06.html#53-予測 に対応。
class Step53PredictTask(gokart.TaskOnKart):
"""
学習したモデルで記事タイトルからカテゴリとその予測確率を計算する。
52と同様に複数指定してロードする。
また、保存 (dump) も辞書型でやってみる。 (https://gokart.readthedocs.io/en/latest/task_on_kart.html#taskonkart-dump)
"""
def output(self):
return {'pred': self.make_target('pred.pkl'), 'prob': self.make_target('prob.pkl')}
def requires(self):
return {
'data': Step50SplitDatasetTask(),
'feature': Step51ExtractFeatureTask(),
'model': Step52TrainTask()
}
def run(self):
_, _, df_test = self.load('data')
_, _, X_test = self.load('feature')
model = self.load('model')
y_test = df_test['CATEGORY']
pred_test = model.predict(X_test)
prob_test = model.predict_proba(X_test)
self.dump(pred_test, 'pred')
self.dump(prob_test, 'prob')
requires()
で複数しているのは前回と同様。今回は予測したあとにカテゴリ予測結果と、各カテゴリの確率を保存した numpy.array
を dump したい。これを実現するには output()
で
def output(self):
return {'pred': self.make_target('pred.pkl'), 'prob': self.make_target('prob.pkl')}
のようにそれぞれの保存パスを指定し、 run()
内で
self.dump(pred_test, 'pred')
self.dump(prob_test, 'prob')
のようにすることで dump できる。
54. 正解率の計測 (テキスト形式での保存)
https://nlp100.github.io/ja/ch06.html#54-正解率の計測 に対応。
class Step54CalcAccuracyTask(gokart.TaskOnKart):
def output(self):
return self.make_target('test_accuracy.txt')
def requires(self):
return {'data': Step50SplitDatasetTask(), 'pred': Step53PredictTask()}
def run(self):
_, _, df_test = self.load('data')
pred_test = self.load('pred')['pred']
gt_test = df_test['CATEGORY']
test_accuracy = accuracy_score(gt_test, pred_test)
self.dump(test_accuracy)
これまで計算結果などは全て pickle として保存してきたが、後々確認しやすいようテキストファイルで保存したい。そのためには output()
内で
def output(self):
return self.make_target('test_accuracy.txt')
のように保存先を指定し、 run()
内で
self.dump(test_accuracy)
のようにする。保存されたファイルの中身は str(test_accuracy)
になっているので、桁数などは適宜調整する必要がある。
55. 混同行列の作成 (画像の保存)
https://nlp100.github.io/ja/ch06.html#55-混同行列の作成 に対応
class Step55CalcConfusionMatrixTask(gokart.TaskOnKart):
def output(self):
return self.make_target('test_confusion_matrix.png')
def requires(self):
return {'data': Step50SplitDatasetTask(), 'pred': Step53PredictTask()}
def run(self):
_, _, df_test = self.load('data')
pred_test = self.load('pred')['pred']
gt_test = df_test['CATEGORY']
test_cm = confusion_matrix(gt_test, pred_test)
fig = plt.figure()
sns.heatmap(test_cm, annot=True, cmap='Blues')
figbin = BytesIO()
fig.savefig(figbin, format='png')
self.dump(figbin.getvalue())
混同行列を作成し画像として保存する。そのために output()
で self.make_target('test_confusion_matrix.png')
のように指定し、 run()
内で、 self.dump(figbin.getvalue())
のようにした。matplotlib の matplotlib のプロットの画像データを取得するために BytesIO を使っており、ちょっと面倒なコードになっている。
もう少し簡単に書く方法がないか調べていると以下の記事でよりスマートな書き方がされていた (PNGFileProcessor を実装)。
NLP の場合は下流で画像を使うことはあまりなさそうなのでわざわざ dump せずに普通に保存すれば良いかも。
58. 正則化パラメータの変更 (luigi パラメータの使用)
https://nlp100.github.io/ja/ch06.html#58-正則化パラメータの変更 に対応。
パラメータチューニング (ロジスティック回帰のパラメータ C を変化させて学習するだけ) を gokart を使ってやってみる。この部分はほんとにこれで良いのかあまり自信がない。 gokart だけでなく、 上記の記事のように redshells や optuna も併用してやるのが良さそう。
まずパラメータを変えて学習するためのクラスを実装した。
class TrainLogisticTask(gokart.TaskOnKart):
C = luigi.FloatParameter(default=0.1)
def output(self):
return self.make_target(f"model-C{self.C}.pkl")
def requires(self):
return {'data': Step50SplitDatasetTask(), 'feature': Step51ExtractFeatureTask()}
def run(self):
df_train, _, _ = self.load('data')
X_train, _, _ = self.load('feature')
y_train = df_train['CATEGORY']
print(f"Training C = {self.C}")
model = LogisticRegression(random_state=123, max_iter=10000, C=self.C)
model.fit(X_train, y_train)
self.dump(model)
タスクにパラメータを渡すには luigi.Parameter
を使う。公式ドキュメント (https://gokart.readthedocs.io/en/latest/task_parameters.html) であまり理解できなかったが、おそらく gokart のパラメータはタスクの管理用で、ハイパーパラメータなど用ではなさそう。
C = luigi.FloatParameter(default=0.1)
の部分でパラメータ用の変数を作成し、 run()
内の
model = LogisticRegression(random_state=123, max_iter=10000, C=self.C)
で利用している。このタスクに対し、パラメータを渡すタスクを以下で定義した。
class Step58ChangeRegularizationParameterTask(gokart.TaskOnKart):
parameters = np.logspace(-5, 3, 9, base=10)
def output(self):
return self.make_target('param_C.png')
def requires(self):
tasks = {}
for C in self.parameters:
task_name = f"Logistic(C={C})"
tasks[task_name] = TrainLogisticTask(C=C)
tasks['data'] = Step50SplitDatasetTask()
tasks['feature'] = Step51ExtractFeatureTask()
return tasks
def run(self):
# test data をロード
_, _, df_test = self.load('data')
_, _, X_test = self.load('feature')
gt_test = df_test['CATEGORY']
# 各モデルをロードして予測
accuracies = []
for C in self.parameters:
task_name = f"Logistic(C={C})"
model = self.load(task_name)
pred_test = model.predict(X_test)
accuracies.append(accuracy_score(gt_test, pred_test))
fig = plt.figure()
ax = fig.add_subplot(1, 1, 1)
ax.set_xscale('log')
ax.plot(self.parameters, accuracies)
figbin = BytesIO()
fig.savefig(figbin, format='png')
self.dump(figbin.getvalue())
以下の部分で 1e-5 から 1e3 までで C を変えるように指定している。
parameters = np.logspace(-5, 3, 9, base=10)
これを以下のように上で定義した TrainLogisticTask
に渡している。
tasks[task_name] = TrainLogisticTask(C=C)
これで一応は動作したが、あまりきれいではない気がする。 特に学習を回しているのが run()
ではなく requires()
のところなのがなんとなく気持ち悪い。ただパラメータを指定できるのが requires()
内だけのようなので他に書きようがない。
また、各学習は並列で動かしたいが、今の知識ではやり方が分からなかった。
タスク依存関係の可視化
luigi ではタスクの依存関係がグラフで見れる。 luigid
を立ち上げて、 --local-scheduler
を指定せずに gokart.run()
を実行するとスケジューラが luigid になっているようだがグラフは表示されなかった。
gokart にも依存関係を表示する機能があるので試してみる。
class RunAllTask(gokart.TaskOnKart):
""" タスクが全部実行されるようにするためのタスク。
"""
task_list = [
Step54CalcAccuracyTask(),
Step55CalcConfusionMatrixTask(),
Step58ChangeRegularizationParameterTask()
]
def requires(self):
return {str(i): task for i, task in enumerate(RunAllTask.task_list)}
print(gokart.make_task_info_as_tree_str(RunAllTask()))
以下のように表示される。
└─-(PENDING) RunAllTask[501fde1a4111ceb9f7bfc844dcafbffa]
|--(COMPLETE) Step54CalcAccuracyTask[e06960c53213b7b17c7229518b5e52bc]
| |--(COMPLETE) Step50SplitDatasetTask[c796f42a5b0ac86fdc966c77ce0cb159]
| | └─-(COMPLETE) Step50DownloadDatasetTask[970fe1008c560743d4978f557d70d4b3]
| └─-(COMPLETE) Step53PredictTask[b81fc92caafd5529b9d588f300a11f75]
| |--(COMPLETE) Step50SplitDatasetTask[c796f42a5b0ac86fdc966c77ce0cb159]
| | └─- ...
| |--(COMPLETE) Step51ExtractFeatureTask[3bfbcd1308c9c08430d87cd9e9f9e09b]
| | └─-(COMPLETE) Step50SplitDatasetTask[c796f42a5b0ac86fdc966c77ce0cb159]
| | └─- ...
| └─-(COMPLETE) Step52TrainTask[f46efdf76d4899d10e81026c814c5013]
| |--(COMPLETE) Step50SplitDatasetTask[c796f42a5b0ac86fdc966c77ce0cb159]
| | └─- ...
| └─-(COMPLETE) Step51ExtractFeatureTask[3bfbcd1308c9c08430d87cd9e9f9e09b]
| └─- ...
|--(COMPLETE) Step55CalcConfusionMatrixTask[1e78f8b628e37e7a7a40f399e769d800]
| |--(COMPLETE) Step50SplitDatasetTask[c796f42a5b0ac86fdc966c77ce0cb159]
| | └─- ...
| └─-(COMPLETE) Step53PredictTask[b81fc92caafd5529b9d588f300a11f75]
| └─- ...
└─-(PENDING) Step58ChangeRegularizationParameterTask[3b7173bc60d66d34736d245224e903dc]
|--(COMPLETE) TrainLogisticTask[69fd35ca06b33a06350e56dd1681bc55]
| |--(COMPLETE) Step50SplitDatasetTask[c796f42a5b0ac86fdc966c77ce0cb159]
| | └─- ...
| └─-(COMPLETE) Step51ExtractFeatureTask[3bfbcd1308c9c08430d87cd9e9f9e09b]
| └─- ...
|--(COMPLETE) TrainLogisticTask[3592d3c26ae512029d1c1ddc19df6b76]
| |--(COMPLETE) Step50SplitDatasetTask[c796f42a5b0ac86fdc966c77ce0cb159]
| | └─- ...
| └─-(COMPLETE) Step51ExtractFeatureTask[3bfbcd1308c9c08430d87cd9e9f9e09b]
| └─- ...
|--(COMPLETE) TrainLogisticTask[275dc1c15c255e59197d13d2e1d3c470]
| |--(COMPLETE) Step50SplitDatasetTask[c796f42a5b0ac86fdc966c77ce0cb159]
| | └─- ...
| └─-(COMPLETE) Step51ExtractFeatureTask[3bfbcd1308c9c08430d87cd9e9f9e09b]
| └─- ...
|--(COMPLETE) TrainLogisticTask[828f6c76bb5c5f67546b38ae4eb71838]
| |--(COMPLETE) Step50SplitDatasetTask[c796f42a5b0ac86fdc966c77ce0cb159]
| | └─- ...
| └─-(COMPLETE) Step51ExtractFeatureTask[3bfbcd1308c9c08430d87cd9e9f9e09b]
| └─- ...
|--(COMPLETE) TrainLogisticTask[8b8fa4ab9a1e6e59450b0ae63ef9a3f6]
| |--(COMPLETE) Step50SplitDatasetTask[c796f42a5b0ac86fdc966c77ce0cb159]
| | └─- ...
| └─-(COMPLETE) Step51ExtractFeatureTask[3bfbcd1308c9c08430d87cd9e9f9e09b]
| └─- ...
|--(COMPLETE) TrainLogisticTask[06d632cc3e5dbbac30a0742abb564a49]
| |--(COMPLETE) Step50SplitDatasetTask[c796f42a5b0ac86fdc966c77ce0cb159]
| | └─- ...
| └─-(COMPLETE) Step51ExtractFeatureTask[3bfbcd1308c9c08430d87cd9e9f9e09b]
| └─- ...
|--(COMPLETE) TrainLogisticTask[844e78f02bd40b4a6b119072e7ae33b9]
| |--(COMPLETE) Step50SplitDatasetTask[c796f42a5b0ac86fdc966c77ce0cb159]
| | └─- ...
| └─-(COMPLETE) Step51ExtractFeatureTask[3bfbcd1308c9c08430d87cd9e9f9e09b]
| └─- ...
|--(COMPLETE) TrainLogisticTask[365900fabc31d22560498e8b90233438]
| |--(COMPLETE) Step50SplitDatasetTask[c796f42a5b0ac86fdc966c77ce0cb159]
| | └─- ...
| └─-(COMPLETE) Step51ExtractFeatureTask[3bfbcd1308c9c08430d87cd9e9f9e09b]
| └─- ...
|--(COMPLETE) TrainLogisticTask[1b1281b0fd1a3241b85460baf6abe46d]
| |--(COMPLETE) Step50SplitDatasetTask[c796f42a5b0ac86fdc966c77ce0cb159]
| | └─- ...
| └─-(COMPLETE) Step51ExtractFeatureTask[3bfbcd1308c9c08430d87cd9e9f9e09b]
| └─- ...
|--(COMPLETE) Step50SplitDatasetTask[c796f42a5b0ac86fdc966c77ce0cb159]
| └─- ...
└─-(COMPLETE) Step51ExtractFeatureTask[3bfbcd1308c9c08430d87cd9e9f9e09b]
感想
NLPモデルを作っていて日々大量のシェルスクリプトや中間ファイルに悩まされているので、それらを一元化できそうな gokart に魅力を感じた。さらに cookiecutter や redshells と組み合わせることで再利用性が高まり、ディレクトリ構造など本質的でない部分で頭を悩ませる場面が減りそう。
ただそれなりに学習コストはありそうなので、他人にコードを共有するときにこれ使ってと渡すのには少し憚られそうな気がした。またメトリックの監視に MLFlow を使っているので、それらのツールと組み合わせるうまい方法があれば知りたい。そもそも他のパイプラインツールも使ったことないので gokart のありがたみが分かっていない。他のもお試ししたい。
参考リンク
-
機械学習プロジェクト向けPipelineライブラリgokartを用いた開発と運用 - エムスリーテックブログ
エムスリー公式なのでドキュメントとこれをまず読むのが良いと思われる -
gokartを使ってみる - Re:ゼロから始めるML生活
題材が NLP (文書分類) なので参考にしやすい -
【Techの道も一歩から】第42回「Luigiとgokartを試用して比べて特徴を掴む」 - Sansan Tech Blog
luigi と gokart の比較が簡潔にまとまっていて分かりやすい -
PythonのPipelineパッケージ比較:Airflow, Luigi, Gokart, Metaflow, Kedro, PipelineX - Qiita
Gokart 以外のパイプラインツールもまとめた力作 -
gokart, redshellsによるMLOpsへの第一歩 - Qiita
ドキュメントで扱われていない部分のコードの書き方がとても参考になる -
【言語処理100本ノック 2020】第6章: 機械学習【Python】 - Amaru Note
100本ノックのコードを書くにあたり大いに参考にさせて頂いた