22
18

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

MetaflowとMLflowを使った快適ML個人開発

Last updated at Posted at 2022-03-10

はじめに

本記事ではMetaflowとMLflowを使った個人開発用MLOpsの方法をご紹介します。
ここ数ヶ月個人で開発しているMLプロジェクトで利用してみて、いろいろなツールがある中で、MetaflowとMLflowの組み合わせが個人開発では最強であることがわかったのでシェアしたいと思います。

個人でMLの研究、開発されている方のご参考になれば幸いです。

Metaflowとは

まず、MetaflowはNetflixが開発しているオープンソースライブラリで、機械学習用のワークフローパイプラインの機能を提供してくれます。似たようなツールだと、GoogleのKubeflow Pipeline (KFP)、Kedro、LyftのFlyteなんかがあります。
個人的にMetaflowを個人開発で用いるメリットはMetaflowが他のツールと比較して導入がかなり手軽な点です。設定なども特に難しいものはなく使い始めたその日から恩恵を受けることができます。タスクの並列処理なども簡単に実装できたりしちゃいます。
また、Pylintで事前にエラー検知をしてくれたり、途中でエラーで失敗した場所からコードを修正した後で再開する機能などもあり、個人開発の域では十分です。もうこれなしでは生きられない。。。

その他の特徴としては、AWS Batchと互換性があるほか、ロードマップとして近い将来Kubernetesのサポートが開始される予定です。

MLflowとは

MLflowとはオープンソースの機械学習の実験管理ツールです。MLの実験管理といえばW&BかMLflowがデファクト化してきていますのでご存じの方も多いかと思います。
個人的にオープンソースなMLflowの方を好んで使っていますが、もちろんMLflowでもW&Bでも好みのものを使っていただいて構いません。

Metaflowの使い方

こちらはMetaflowのチュートリアルから引用したものです。Metaflowの使い方は至ってシンプルで、FlowSpecを継承したクラスのメソッドにstepデコレータを利用して実行するタスクを登録します。メソッド内のself.next(self.xxx)の部分でタスク間の依存関係を定義します。また、必ずstartendというstepを作成しなければならないことに注意してください。

from metaflow import FlowSpec, step


class HelloFlow(FlowSpec):
    """
    A flow where Metaflow prints 'Hi'.
    Run this flow to validate that Metaflow is installed correctly.
    """
    @step
    def start(self):
        """
        This is the 'start' step. All flows must have a step named 'start' that
        is the first step in the flow.
        """
        print("HelloFlow is starting.")
        self.next(self.hello)

    @step
    def hello(self):
        """
        A step for metaflow to introduce itself.
        """
        print("Metaflow says: Hi!")
        self.next(self.end)

    @step
    def end(self):
        """
        This is the 'end' step. All flows must have an 'end' step, which is the
        last step in the flow.
        """
        print("HelloFlow is all done.")

if __name__ == '__main__':
    HelloFlow()

ファイル作成後、python xxx.py runと打つとフローが実行されます。
ちなみに、自分の環境で実行した時はこんな感じのログが表示されました。実行するグラフが正常かどうかとPylintによるエラー検知をしてくれているのがわかります。また、ログに現在実行されているstepとpidが表示されており、何気便利です。

Validating your flow...
    The graph looks good!
Running pylint...
    Pylint is happy!
2022-03-10 17:42:01.108 Workflow starting (run-id 1646901721088290):
2022-03-10 17:42:01.122 [1646901721088290/start/1 (pid 17330)] Task is starting.
2022-03-10 17:42:02.642 [1646901721088290/start/1 (pid 17330)] HelloFlow is starting.
2022-03-10 17:42:02.847 [1646901721088290/start/1 (pid 17330)] Task finished successfully.
2022-03-10 17:42:02.865 [1646901721088290/hello/2 (pid 17333)] Task is starting.
2022-03-10 17:42:04.360 [1646901721088290/hello/2 (pid 17333)] Metaflow says: Hi!
2022-03-10 17:42:04.571 [1646901721088290/hello/2 (pid 17333)] Task finished successfully.
2022-03-10 17:42:04.591 [1646901721088290/end/3 (pid 17336)] Task is starting.
2022-03-10 17:42:06.151 [1646901721088290/end/3 (pid 17336)] HelloFlow is all done.
2022-03-10 17:42:06.349 [1646901721088290/end/3 (pid 17336)] Task finished successfully.
2022-03-10 17:42:06.353 Done!

Metaflow利用時の注意点を一応上げると、step間での変数の受け渡しは基本インスタンス変数(self.xxx)を介して行和なければならなかったり、pickleできないオブジェクトをstep間で受け渡しを行うようにするとエラーが起きるなどがあります。ただ、その点さえ注意していれば問題なく使えるかと。

Metaflow&MLflow

本題ですが、以下が自分の実際のコードを簡略化したものになります。ポイントがいくつかあるので順を追って説明していきます。

まず、Parameter()ですが、これを用いることでシェルから実行するときに、python xxx.py run --artifact_dir=path/to/dirという感じで指定できるようになります。
次に、train関数の部分ですが、学習させたモデルのリストをforeachで指定することで並列処理が可能です。並列処理されるevlauate関数のあとはjoinという関数を指定する必要があり、ここで並列処理された結果がまとめて返ってきます。evaluate関数内ではself.inputで分散される入力(今回の場合はmodel)にアクセスすることができます。この辺の詳しい使い方は公式ドキュメントを参考にすると良いかと思います。

個人的には実験単位でFlowを定義してモデルのevaluationを並列実行し、その実験結果をMLflowで追跡していくみたいなことを僕はよくしています。この方法だと、実験レベルでコードを管理できて何かと便利です。
個人開発の際によければご参考に。

from pathlib import Path

import mlflow
from metaflow import FlowSpec, Parameter, step
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression

from utils import preprocess

class ClsExperimentFlow(FlowSpec):
    artifact_dir = Parameter("artifact_dir", default="./data")

    def __init__(self, models) -> None:
        mlflow.set_experiment("my-experiment")
        self._models = models
        super().__init__()

    @step
    def start(self):
        self.train_df = pd.read_csv(Path(self.artifact_dir) / "train.csv")
        self.test_df = pd.read_csv(Path(self.artifact_dir) / "test.csv")
        self.next(self.preprocess)

    @step
    def preprocess(self):
        print("Preprocess")
        self.train_df, self.test_df = preprocess(self.train_df, self.test_df)
        self.next(self.train)

    @step
    def train(self):
        print("Train")
        self.models = [model.train(*self.train_data) for model in self._models]
        self.next(self.evaluate, foreach="models")

    @step
    def evaluate(self):
        model = self.input
        ret = []
        with mlflow.start_run():
            # Evaluation & log metrics to mlflow...
        self.next(self.join)

    @step
    def join(self, inputs):
        self.next(self.end)

    @step
    def end(self):
        print("Done!!")


def _main():
    models = [LogisticRegression(), RandomForestClassifier()]
    ClsExperimentFlow(models)


if __name__ == "__main__":
    _main()
22
18
3

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
22
18

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?