はじめに
本記事ではMetaflowとMLflowを使った個人開発用MLOpsの方法をご紹介します。
ここ数ヶ月個人で開発しているMLプロジェクトで利用してみて、いろいろなツールがある中で、MetaflowとMLflowの組み合わせが個人開発では最強であることがわかったのでシェアしたいと思います。
個人でMLの研究、開発されている方のご参考になれば幸いです。
Metaflowとは
まず、MetaflowはNetflixが開発しているオープンソースライブラリで、機械学習用のワークフローパイプラインの機能を提供してくれます。似たようなツールだと、GoogleのKubeflow Pipeline (KFP)、Kedro、LyftのFlyteなんかがあります。
個人的にMetaflowを個人開発で用いるメリットはMetaflowが他のツールと比較して導入がかなり手軽な点です。設定なども特に難しいものはなく使い始めたその日から恩恵を受けることができます。タスクの並列処理なども簡単に実装できたりしちゃいます。
また、Pylintで事前にエラー検知をしてくれたり、途中でエラーで失敗した場所からコードを修正した後で再開する機能などもあり、個人開発の域では十分です。もうこれなしでは生きられない。。。
その他の特徴としては、AWS Batchと互換性があるほか、ロードマップとして近い将来Kubernetesのサポートが開始される予定です。
MLflowとは
MLflowとはオープンソースの機械学習の実験管理ツールです。MLの実験管理といえばW&BかMLflowがデファクト化してきていますのでご存じの方も多いかと思います。
個人的にオープンソースなMLflowの方を好んで使っていますが、もちろんMLflowでもW&Bでも好みのものを使っていただいて構いません。
Metaflowの使い方
こちらはMetaflowのチュートリアルから引用したものです。Metaflowの使い方は至ってシンプルで、FlowSpecを継承したクラスのメソッドにstepデコレータを利用して実行するタスクを登録します。メソッド内のself.next(self.xxx)
の部分でタスク間の依存関係を定義します。また、必ずstart
とend
というstepを作成しなければならないことに注意してください。
from metaflow import FlowSpec, step
class HelloFlow(FlowSpec):
"""
A flow where Metaflow prints 'Hi'.
Run this flow to validate that Metaflow is installed correctly.
"""
@step
def start(self):
"""
This is the 'start' step. All flows must have a step named 'start' that
is the first step in the flow.
"""
print("HelloFlow is starting.")
self.next(self.hello)
@step
def hello(self):
"""
A step for metaflow to introduce itself.
"""
print("Metaflow says: Hi!")
self.next(self.end)
@step
def end(self):
"""
This is the 'end' step. All flows must have an 'end' step, which is the
last step in the flow.
"""
print("HelloFlow is all done.")
if __name__ == '__main__':
HelloFlow()
ファイル作成後、python xxx.py run
と打つとフローが実行されます。
ちなみに、自分の環境で実行した時はこんな感じのログが表示されました。実行するグラフが正常かどうかとPylintによるエラー検知をしてくれているのがわかります。また、ログに現在実行されているstepとpidが表示されており、何気便利です。
Validating your flow...
The graph looks good!
Running pylint...
Pylint is happy!
2022-03-10 17:42:01.108 Workflow starting (run-id 1646901721088290):
2022-03-10 17:42:01.122 [1646901721088290/start/1 (pid 17330)] Task is starting.
2022-03-10 17:42:02.642 [1646901721088290/start/1 (pid 17330)] HelloFlow is starting.
2022-03-10 17:42:02.847 [1646901721088290/start/1 (pid 17330)] Task finished successfully.
2022-03-10 17:42:02.865 [1646901721088290/hello/2 (pid 17333)] Task is starting.
2022-03-10 17:42:04.360 [1646901721088290/hello/2 (pid 17333)] Metaflow says: Hi!
2022-03-10 17:42:04.571 [1646901721088290/hello/2 (pid 17333)] Task finished successfully.
2022-03-10 17:42:04.591 [1646901721088290/end/3 (pid 17336)] Task is starting.
2022-03-10 17:42:06.151 [1646901721088290/end/3 (pid 17336)] HelloFlow is all done.
2022-03-10 17:42:06.349 [1646901721088290/end/3 (pid 17336)] Task finished successfully.
2022-03-10 17:42:06.353 Done!
Metaflow利用時の注意点を一応上げると、step間での変数の受け渡しは基本インスタンス変数(self.xxx)を介して行和なければならなかったり、pickle
できないオブジェクトをstep間で受け渡しを行うようにするとエラーが起きるなどがあります。ただ、その点さえ注意していれば問題なく使えるかと。
Metaflow&MLflow
本題ですが、以下が自分の実際のコードを簡略化したものになります。ポイントがいくつかあるので順を追って説明していきます。
まず、Parameter()ですが、これを用いることでシェルから実行するときに、python xxx.py run --artifact_dir=path/to/dir
という感じで指定できるようになります。
次に、train関数の部分ですが、学習させたモデルのリストをforeachで指定することで並列処理が可能です。並列処理されるevlauate関数のあとはjoinという関数を指定する必要があり、ここで並列処理された結果がまとめて返ってきます。evaluate関数内ではself.input
で分散される入力(今回の場合はmodel)にアクセスすることができます。この辺の詳しい使い方は公式ドキュメントを参考にすると良いかと思います。
個人的には実験単位でFlowを定義してモデルのevaluationを並列実行し、その実験結果をMLflowで追跡していくみたいなことを僕はよくしています。この方法だと、実験レベルでコードを管理できて何かと便利です。
個人開発の際によければご参考に。
from pathlib import Path
import mlflow
from metaflow import FlowSpec, Parameter, step
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from utils import preprocess
class ClsExperimentFlow(FlowSpec):
artifact_dir = Parameter("artifact_dir", default="./data")
def __init__(self, models) -> None:
mlflow.set_experiment("my-experiment")
self._models = models
super().__init__()
@step
def start(self):
self.train_df = pd.read_csv(Path(self.artifact_dir) / "train.csv")
self.test_df = pd.read_csv(Path(self.artifact_dir) / "test.csv")
self.next(self.preprocess)
@step
def preprocess(self):
print("Preprocess")
self.train_df, self.test_df = preprocess(self.train_df, self.test_df)
self.next(self.train)
@step
def train(self):
print("Train")
self.models = [model.train(*self.train_data) for model in self._models]
self.next(self.evaluate, foreach="models")
@step
def evaluate(self):
model = self.input
ret = []
with mlflow.start_run():
# Evaluation & log metrics to mlflow...
self.next(self.join)
@step
def join(self, inputs):
self.next(self.end)
@step
def end(self):
print("Done!!")
def _main():
models = [LogisticRegression(), RandomForestClassifier()]
ClsExperimentFlow(models)
if __name__ == "__main__":
_main()