LoginSignup
11
8

More than 1 year has passed since last update.

mlflowとluigiによるML実験管理例

Last updated at Posted at 2021-12-10

はじめに

本記事ではMLの実験を行うときの、コード、パラメータ、モデル、評価結果を管理するための構成例を紹介します。
サンプルコードはこちら

前提知識

  • Must
    • python
    • docker
  • Want
    • mlflow
    • luigi

思想

  • 前処理を加えたデータや学習したモデルなどプログラムで出力されるファイルは全てmlflowの管理下におく。
  • コードはgitで管理し、実験結果とcommit hashを紐づける。
  • 前処理、学習、推論などタスク同士の依存関係を管理して、依存しているタスクを自動で実行できるようにする。また、既に実行されたタスクは実行しないようにする。

構成概要

  • titanicのdataに対して、前処理、学習、推論を行う例を紹介する。
  • ディレクトリ構成は以下のような感じ。
  • src/tasks/下に前処理などの具体的なタスクを行うファイルを作成する。
  • tomlファイルで実行するタスクを指定してsrc/runner.pyを実行する。
  • 各タスクの出力は、mlruns下のmlflowの出力ファイルの管理先のデフォルト位置に実験ごとに出力する。
ディレクトリ構成
.
├── Dockerfile
├── LICENSE
├── README.md
├── docker-compose.yml
├── input
│   ├── gender_submission.csv
│   ├── test.csv
│   ├── titanic.zip
│   └── train.csv
├── luigi.toml
├── mlruns/
├── requirements.txt
└── src
    ├── runner.py
    └── tasks
        ├── prediction.py
        ├── preprocessor.py
        ├── run_task.py
        └── trainer.py

環境構築

  • mlflowなど必要なライブラリを入れるDockerfileを適当に作成して、下記のようなdocker-compose.ymlを用意して、プロジェクト直下でdocker-compose up dなどでmlflow trackingサーバを立ち上げる。
  • mlflow trackingでは、サーバを立ち上げるときにmetricなどを管理するTracking URIと、出力されるファイルを管理するArtifact URIという2つのURIを指定することができる。デフォルトではどちらも./mlruns/下に保存される。
  • サーバを立ち上げた後{サーバのIP}:{指定したポート番号}をブラウザで開くことでアクセスできる。
docker-compose.yml
version: "3"
services:
  mlflow:
    image: ml_experiment_management_demo
    container_name: ml_experiment_management_demo_mlflow
    build:
      context: .
      dockerfile: Dockerfile
    volumes:
      - $PWD:$PWD
    working_dir: $PWD
    ports:
      - "5000:5000"
    command: mlflow ui -h 0.0.0.0
    restart: always

コードの構成

.envに、プロジェクトのパス、利用するconfigのパス、configの種類を指定する。

.env
PROJECT_ROOT={プロジェクトのパスを指定}
LUIGI_CONFIG_PATH=luigi.toml
LUIGI_CONFIG_PARSER=toml
  • luigiの機能で環境変数LUIGI_CONFIG_PATHに指定したファイルをconfigとして読み込むことができる。ここではluigi.tomlとしている。
  • RunConfigはrunner.pyで利用するconfigで、実験名や概要、実行するtaskを指定する設計になっている。
  • PathConfigでは各タスクの入出力先ディレクトリのpathを定義する。
  • 前処理、学習、推論の各タスクのconfigはPreprocessor、Trainer、Predictionに定義する。
luigi.toml
[RunConfig]
#実験名:mlflowの実験単位
experiment_name="demo"
description="概要"
# 実行するTask
task="Prediction"

[PathConfig]
# input
input="./input/"

# Preprocessorのoutput、Trainer、Evaluatorのinput
# デフォルトはartifact_uri下、"./mlruns/{experiment_id}/{run_id}/artifacts/preprocessor/"となる。
#preprocessor="./mlruns/"

# Trainerのoutput、Evaluatorのinput
# デフォルはトartifact_uri下"./mlruns/{experiment_id}/{run_id}/artifacts/trainer/"となる。
#trainer="./mlruns/"

# Predictionのoutput、
# デフォルトはartifact_uri下"./mlruns/{experiment_id}/{run_id}/artifacts/prediction/"となる。
#trainer="./mlruns/"

[Preprocessor]
train_columns=['PassengerId','Survived', 'Pclass', 'SibSp', 'Parch']
test_columns=['PassengerId','Pclass', 'SibSp', 'Parch']

[Trainer]
seed=0

[Prediction]

  • runner.pyが実際に実行するスクリプトになる。
  • .envとRunConfigを読み込んで指定したtaskを実行する。
  • experiment_nameを実験名として設定して、利用したconfigファイルや実行するtaskをlogとして記録している。詳しくはmlflow trackingのドキュメント参照。
runner.py
import os
import sys

import luigi
import mlflow
from dotenv import load_dotenv
from luigi.configuration import add_config_path, get_config

from tasks.prediction import Prediction
from tasks.preprocessor import Preprocessor
from tasks.trainer import Trainer


def main():
    load_dotenv()
    sys.path.append(f"{os.getenv('PROJECT_ROOT')}src/")
    add_config_path(os.getenv("LUIGI_CONFIG_PATH"))
    run_config = get_config(os.getenv("LUIGI_CONFIG_PARSER"))["RunConfig"]
    mlflow.set_experiment(run_config["experiment_name"])
    mlflow.start_run()
    mlflow.set_tag('mlflow.note.content', run_config["description"])
    mlflow.log_param("description", run_config["description"])
    mlflow.log_param("task", run_config["task"])
    mlflow.set_tag('mlflow.runName', mlflow.active_run().info.run_id)
    mlflow.log_artifact(os.getenv("LUIGI_CONFIG_PATH"))
    luigi.run([run_config["task"], "--local-scheduler"])
    mlflow.end_run()


if __name__ == '__main__':
    main()

タスクの定義

  • runner.pyで実行するタスクはRunTaskクラスを継承して作成する。
  • RunTaskクラスでは、PathConfigで各タスクの出力先ディレクトリのpathが指定されているかを確認し、指定されていない場合./mlruns/{experiment_id}/{run_id}/artifacts/{task_name}/に設定する。
  • build_input_output_path_dictsはサブクラスで入出力pathの一覧を返すように実装することを想定しており、サブクラスでは設定されたpath_configを用いて、入出力のプロジェクトルートからの相対パスか絶対パスを返すように実装する。
  • RunTaskクラスはluigiのTaskを継承しており、outputメソッドでoutput_paths_dictの値をluigiのLocalTargetとして指定する。
  • luigi.Taskの機能でコンストラクタの実行後outputメソッドで指定したファイルが既にある場合、実行済のタスクと見なされる。これにより、RunTaskを継承した各タスクはoutput_paths_dictに設定したファイルが既にあれば、実行しなくて済むようになっている。
  • また、PathConfigを指定しなかった場合、run_idは毎回変わるのでそのタスクは必ず実行される。
run_task.py
import os
from abc import abstractmethod
from typing import Tuple

import luigi
import mlflow
from luigi.configuration import get_config


class RunTask(luigi.Task):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.config = get_config(
            os.getenv("LUIGI_CONFIG_PARSER")
        )[self.__class__.__name__]

        experiment_id = mlflow.active_run().info.experiment_id
        run_id = mlflow.active_run().info.run_id

        self.path_config = get_config(
            os.getenv("LUIGI_CONFIG_PARSER"))["PathConfig"]
        self.artifacts_uri_path = f"./mlruns/{experiment_id}/{run_id}/artifacts/"

        # タスクごとにPathConfigが指定されていなければデフォルト値設定
        # デフォルトパス ./mlruns/{experiment_id}/{run_id}/artifacts/{task_name}/
        configs_keys = get_config(os.getenv("LUIGI_CONFIG_PARSER")).data.keys()
        task_list = [i for i in configs_keys if i not in [
            "RunConfig", 'PathConfig']]
        for task_name in task_list:
            self.path_config.setdefault(
                task_name,
                f"{self.artifacts_uri_path}{task_name}/")
        # 入出力パス output_paths_dictのvaluesのパスにファイルが既にある場合、そのタスクは行われない
        self.input_paths_dict, self.output_paths_dict = self.build_input_output_path_dicts()
        for output_path in self.output_paths_dict.values():
            os.makedirs(os.path.dirname(output_path), exist_ok=True)

    @abstractmethod
    def build_input_output_path_dicts(self) -> Tuple[dict, dict]:
        """
        クラスの入出力パスをそれぞれdictで返す

        Returns:
            input_paths_dict,output_paths_dict
        """
        pass

    def output(self):
        return list(map(lambda x: luigi.LocalTarget(x),
                        self.output_paths_dict.values()))

前処理

  • RunTaskの説明で述べたように、前処理のタスクPreprocessorクラスはRunTaskを継承して作成する。
  • build_input_output_path_dictsでpath_configの値を利用して、入出力pathを設定している。
  • runメソッドがtaskの実行する処理で、ここでは前処理の内容を実装している。
  • ここで行っている前処理は、シンプルにtrain、testをconfigで指定したcolumnsのみにしているだけである。
preprocessor.py
from typing import Tuple

import pandas as pd

from tasks.run_task import RunTask


class Preprocessor(RunTask):
    def build_input_output_path_dicts(self) -> Tuple[dict, dict]:
        input_paths_dict = {
            "train": f"{self.path_config['input']}train.csv",
            "test": f"{self.path_config['input']}test.csv",
        }
        output_paths_dict = {
            "train": f"{self.path_config['Preprocessor']}train.csv",
            "test": f"{self.path_config['Preprocessor']}test.csv",
        }
        return input_paths_dict, output_paths_dict

    def run(self):
        train = pd.read_csv(self.input_paths_dict["train"])
        test = pd.read_csv(self.input_paths_dict["test"])
        train = train[self.config["train_columns"]]
        test = test[self.config["test_columns"]]
        train.to_csv(self.output_paths_dict["train"], index=False)
        test.to_csv(self.output_paths_dict["test"], index=False)

学習

  • 前処理と同様にRunTaskを継承して作成している。
  • luigiの機能の@requiresへルパを用いて、Preprocessorに依存していることを定義している。これによりTrainerを実行する前にPreprocessorが先に実行される。詳しくはluigiのdocument参照。
  • また、Trainerではsklearnのモデルで学習しているが、mlflowのmlflow.sklearn.autologを用いることで、Metricsなどがいくつか自動で保存することができる。自分で指標を設定したいときはlog_metricなどを用いる。詳しくはmlflowのLogging Functionsのdocument参照。
trainer.py
import pickle
from typing import Tuple

import mlflow.sklearn
import pandas as pd
from luigi.util import requires
from sklearn import linear_model

from tasks.preprocessor import Preprocessor
from tasks.run_task import RunTask


@requires(Preprocessor)
import pickle
from typing import Tuple

import mlflow.sklearn
import pandas as pd
from luigi.util import requires
from sklearn import linear_model

from tasks.preprocessor import Preprocessor
from tasks.run_task import RunTask


@requires(Preprocessor)
class Trainer(RunTask):
    def build_input_output_path_dicts(self) -> Tuple[dict, dict]:
        input_paths_dict = {
            "train": f"{self.path_config['Preprocessor']}train.csv",
            "test": f"{self.path_config['Preprocessor']}test.csv",
        }
        output_paths_dict = {
            "model": f"{self.path_config['Trainer']}model",
        }
        return input_paths_dict, output_paths_dict

    def run(self):
        train = pd.read_csv(
            self.input_paths_dict["train"],
            index_col='PassengerId')
        X = train.drop("Survived", axis=1)
        y = train["Survived"]
        mlflow.sklearn.autolog()
        reg = linear_model.RidgeClassifier(random_state=self.config["seed"])
        reg.fit(X, y)
        pickle.dump(reg, open(self.output_paths_dict["model"], "wb"))

推論

  • 前処理・学習と同様にRunTaskを継承して作成する。
  • 実行に前処理済みのtestと学習したmodelが必要になるため、Preprocessor, Trainerを@requiresに指定している。
prediction.py
import pickle
from typing import Tuple

import pandas as pd
from luigi.util import requires

from tasks.preprocessor import Preprocessor
from tasks.run_task import RunTask
from tasks.trainer import Trainer


@requires(Preprocessor, Trainer)
class Prediction(RunTask):
    def build_input_output_path_dicts(self) -> Tuple[dict, dict]:
        input_paths_dict = {
            "test": f"{self.path_config['Preprocessor']}test.csv",
            "model": f"{self.path_config['Trainer']}model",
        }
        output_paths_dict = {
            "test_prediction": f"{self.path_config['Prediction']}test_prediction.csv", }
        return input_paths_dict, output_paths_dict

    def run(self):
        test = pd.read_csv(
            self.input_paths_dict["test"],
            index_col="PassengerId")
        model = pickle.load(open(self.input_paths_dict["model"], "rb"))
        test_prediction = pd.DataFrame(
            model.predict(test),
            columns=["Survived"],
            index=test.index
        )
        test_prediction.to_csv(self.output_paths_dict["test_prediction"])

実行例

  • プロジェクトルートでpython src/runnr.pyを実行することで、configのtaskで指定したタスクを実行する。例えばtask="Prediction"としていれば、Predictionのrunを実行する。ただし、PredictionはPreprocessor,Trainerに依存しているため、先に前処理、学習が実行される。
  • 上記の実行後、下記のようにPreprocessor,Trainerのpathを各タスクの実行結果のディレクトリに指定して、再びtask="Prediction"として実行した場合、Predictionの入力として先程の実行結果が用いられる。この場合、前処理、学習の処理は省略できる。
[PathConfig]
Preprocessor="./mlruns/1/ba79349b64ac4493ac1f6c6eac306e68/artifacts/Preprocessor/"
Trainer="./mlruns/1/ba79349b64ac4493ac1f6c6eac306e68/artifacts/Trainer/"

課題

  • mlflowのartifact URIがデフォルトの前提で、コード内でパスを指定している。つまり、実行しているプロジェクトルート直下のmlruns/ディレクトリ下で管理する前提になっている。環境変数やconfigファイルなどで指定できるようにしたほうが望ましい。
  • 1つのファイルに全てのconfigをまとめているので、設定項目が多くなると頻雑になる。ただし、半端に分けても扱いづらくなると思うので難しいところ。
11
8
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
11
8