5
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Docker + PostgreSQL + minio でMLflow 実行環境を構築

Posted at

この記事はなに?

postgresql + minio を使って mlflow を使ってモデルをロギングする環境をDockerを使って構築したときのメモです。

環境

python 3.9.16
mlflow 2.2.1

レポジトリ

以下で公開しています

使用ファイル

Dockerfile

# # our base image
FROM python:3.9.16-buster

# install Python modules needed by the Python app
COPY requirements.txt /app/
RUN pip install -U pip && \
    pip install --no-cache-dir -r /app/requirements.txt

WORKDIR /app

docker-compose.yml

version: "3"
services:
    app:
        build:
            context: ./
            dockerfile: Dockerfile
        volumes:
            - ./mlflow:/app
        env_file:
            - ./.env
        environment:
            - DB_HOSTNAME=${DB_HOSTNAME}
            - DB_PORT=${DB_PORT}
            - DB_PASSWORD=${DB_PASSWORD}
            - DB_NAME=${DB_NAME}
            - DB_USERNAME=${DB_USERNAME}
            - MLFLOW_S3_ENDPOINT_URL=http://minio:9000
            - AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID}
            - AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY}
        tty: true
        init: true
        depends_on:
            - minio
            - postgres

    postgres:
        image: postgres
        ports:
            - 5432:5432
        env_file:
            - ./.env
        environment:
            - POSTGRES_PASSWORD=${DB_PASSWORD}
            - POSTGRES_DB=${DB_NAME}
            - DB_USERNAME=${DB_USERNAME}
            - POSTGRES_INITDB_ARGS="--encoding=UTF-8"
        volumes:
            - postgres-db:/var/lib/postgresql/data
            - ./postgresql:/docker-entrypoint-initdb.d

        restart: always
        user: root
    # DB確認用
    pgadmin4:
        image: dpage/pgadmin4:6
        #container_name: pgadmin4
        ports:
            - 8002:80
        volumes:
            - pgadmin4_volume:/var/lib/pgadmin
        env_file:
            - ./.env
        environment:
            - PGADMIN_DEFAULT_EMAIL=${PGADMIN_DEFAULT_EMAIL}
            - PGADMIN_DEFAULT_PASSWORD=${PGADMIN_DEFAULT_PASSWORD}
        depends_on:
            - postgres
        restart: always

    # S3互換のストレージ
    minio:
        image: quay.io/minio/minio
        restart: unless-stopped
        volumes:
            - ./minio:/data
        ports:
            - 9000:9000
            - 9001:9001 # ブラウザ確認用
        environment:
            MINIO_ROOT_USER: ${MINIO_ROOT_USER}
            MINIO_ROOT_PASSWORD: ${MINIO_ROOT_PASSWORD}
        command: server /data --console-address ":9001"

    # minioコンテナ起動時にデフォルトのバケットを自動作成する
    defaultbucket:
        image: minio/mc
        depends_on:
        - minio
        entrypoint: >
            /bin/sh -c "
            until (/usr/bin/mc config host add minio http://minio:9000 ${MINIO_ROOT_USER} ${MINIO_ROOT_PASSWORD}) do echo 'try to create buckets...' && sleep 1; done;
            /usr/bin/mc mb ${MINIO_MLFLOW_BUCKET};
            /usr/bin/mc policy download ${MINIO_MLFLOW_BUCKET};
            exit 0;
            "

volumes:
    postgres-db:
    pgadmin4_volume:

実行スクリプト

accessor.py

アクセス用、検索用のクラスを定義して使用しています。

from pprint import pprint

import mlflow
from mlflow.tracking import MlflowClient


class MlflowWriter:
    def __init__(self, experiment_name, artifact_location=None, **kwargs):
        """mlflow書き込みを行うクラス
        Args:
            experiment_name (str): 実験名
            artifact_location (str, optional): artifactの保存先. ex: s3://<bucket_name>/artifacts Defaults to None.
        """
        self.client = MlflowClient(**kwargs)
        try:
            self.experiment_id = self.client.create_experiment(
                experiment_name,
                artifact_location=artifact_location,
            )
        except:
            self.experiment_id = self.client.get_experiment_by_name(experiment_name).experiment_id

        self.run_id = self.client.create_run(self.experiment_id).info.run_id
        self.artifact_path = "model"

    def log_params_from_omegaconf_dict(self, params):
        for param_name, element in params.items():
            self._explore_recursive(param_name, element)

    def _explore_recursive(self, parent_name, element):
        if isinstance(element, DictConfig):
            for k, v in element.items():
                if isinstance(v, DictConfig) or isinstance(v, ListConfig):
                    self._explore_recursive(f"{parent_name}.{k}", v)
                else:
                    self.client.log_param(self.run_id, f"{parent_name}.{k}", v)
        elif isinstance(element, ListConfig):
            for i, v in enumerate(element):
                self.client.log_param(self.run_id, f"{parent_name}.{i}", v)

    def log_torch_model(self, model, registered_model_name):
        """Logs a torch model"""
        with mlflow.start_run(self.run_id):
            mlflow.pytorch.log_model(model, self.artifact_path, registered_model_name=registered_model_name)

    def log_torch_state_dict(self, model):
        with mlflow.start_run(self.run_id):
            mlflow.pytorch.log_state_dict(
                model.state_dict(),
                artifact_path=self.artifact_path,
            )

    def load_torch_model(self, model_name, model_version):
        return mlflow.pytorch.load_model(model_uri=f"models:/{model_name}/{model_version}")

    def log_sklearn_model(self, model, registered_model_name=None):
        """sklearn modelの保存
        model: 保存するモデル
        registered_model_name: mlflow内部でのモデル名
        同名のモデルが保存されるとモデルのバージョンが上がる.
        """
        with mlflow.start_run(self.run_id):
            mlflow.sklearn.log_model(model, self.artifact_path, registered_model_name=registered_model_name)

    def log_param(self, key, value):
        """パラメータのロギング
        Args:
            key (_type_): _description_
            value (_type_): _description_
        """
        self.client.log_param(self.run_id, key, value)

    def log_metric(self, key, value, step=0):
        self.client.log_metric(self.run_id, key, value, step=step)

    def log_artifact(self, local_path):
        """アーカイブのロギング
        Args:
            local_path (str): 保存するファイルのパス
        """
        self.client.log_artifact(self.run_id, local_path)

    def set_tag(self, key, value):
        """tagをつける"""
        self.client.set_tag(self.run_id, key, value)

    def set_terminated(self):
        """記録終了"""
        self.client.set_terminated(self.run_id)


class MLflowSearcher:
    def __init__(self):
        """mlflow検索用クラス
        Args:
            experiment_name (str): 実験
            artifact_location (str, optional): artifactの保存先. ex: s3://<bucket_name>/artifacts Defaults to None.
        """
        self.client = mlflow.tracking.MlflowClient()

    def search_model_by_run_id(self, run_id):
        """run_idに一致するモデル情報を取得する
        Args:
            run_id (str): run_id
        Returns:
        """
        results = self.client.search_model_versions(f'run_id = "{run_id}"')
        for res in results:
            pprint(res)

    def search_model_by_model_name(self, model_name):
        results = self.client.search_model_versions(f'name = "{model_name}"')
        for res in results:
            pprint(res)

    def get_metric_history(self, run_id, metric_names):
        def print_metric_info(history):
            for m in history:
                print("name: {}".format(m.key))
                print("value: {}".format(m.value))
                print("step: {}".format(m.step))
                print("timestamp: {}".format(m.timestamp))
                print("--")

        for name in metric_names:
            print_metric_info(self.client.get_metric_history(run_id, name))

main.py

scikit-learnモデルを学習し、精度のロギング、レジストリへの保存を行い、保存した実験結果の検索を実行しています。
精度情報などはpostgresqlに、モデルファイルはminioストレージに保存されます。

import logging
import os
from urllib.parse import urlparse

import numpy as np
import pandas as pd
from accessor import MLflowSearcher, MlflowWriter
from sklearn.linear_model import ElasticNet
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
from sklearn.model_selection import train_test_split

import mlflow
import mlflow.sklearn
from mlflow.store.artifact.runs_artifact_repo import RunsArtifactRepository

DB = os.environ.get("DB_NAME")
USER = os.environ.get("DB_USERNAME")
PASSWORD = os.environ.get("DB_PASSWORD")
HOST = os.environ.get("DB_HOSTNAME")
PORT = os.environ.get("DB_PORT")
# mlflow db 設定
os.environ["MLFLOW_TRACKING_URI"] = f"postgresql+psycopg2://{USER}:{PASSWORD}@{HOST}:{PORT}/{DB}"

logging.basicConfig(level=logging.WARN)
logger = logging.getLogger(__name__)


def eval_metrics(actual, pred):
    rmse = np.sqrt(mean_squared_error(actual, pred))
    mae = mean_absolute_error(actual, pred)
    r2 = r2_score(actual, pred)
    return rmse, mae, r2


def train():
    # Read the wine-quality csv file from the URL
    csv_url = "http://archive.ics.uci.edu/ml/machine-learning-databases/wine-quality/winequality-red.csv"
    try:
        data = pd.read_csv(csv_url, sep=";")
    except Exception as e:
        logger.exception(
            "Unable to download training & test CSV, check your internet connection. Error: %s",
            e,
        )

    # Split the data into training and test sets. (0.75, 0.25) split.
    train, test = train_test_split(data)

    # The predicted column is "quality" which is a scalar from [3, 9]
    train_x = train.drop(["quality"], axis=1)
    test_x = test.drop(["quality"], axis=1)
    train_y = train[["quality"]]
    test_y = test[["quality"]]

    alpha = 0.5
    l1_ratio = 0.5

    # mlflowで記録
    model_name = "model1"

    writer = MlflowWriter(experiment_name=f"exp-{model_name}", artifact_location="s3://mlflow/artifacts")
    # with mlflow.start_run() as run:
    lr = ElasticNet(alpha=alpha, l1_ratio=l1_ratio, random_state=42)
    lr.fit(train_x, train_y)

    predicted_qualities = lr.predict(test_x)

    (rmse, mae, r2) = eval_metrics(test_y, predicted_qualities)

    print("Elasticnet model (alpha=%f, l1_ratio=%f):" % (alpha, l1_ratio))
    print("  RMSE: %s" % rmse)
    print("  MAE: %s" % mae)
    print("  R2: %s" % r2)

    # パラメータ保存
    writer.log_param("alpha", alpha)
    writer.log_param("l1_ratio", l1_ratio)
    writer.log_metric("rmse", rmse)
    writer.log_metric("r2", r2)
    writer.log_metric("mae", mae)
    writer.set_tag("system_name", system_name)
    writer.set_tag("model_name", model_name)

    tracking_url_type_store = urlparse(mlflow.get_tracking_uri()).scheme
    print("tracking_uri: ", mlflow.get_tracking_uri())
    print("tracking_url_type")

    # Model registry does not work with file store
    # ファイルストアではモデルレジストリは使えない
    if tracking_url_type_store != "file":
        writer.log_sklearn_model(lr, model_name)
    else:
        writer.log_sklearn_model(lr, model_name)
        # mlflow.sklearn.log_model(lr, "model")

    # run_idを登録
    logger.info(f"Run ID: {writer.run_id} Exp_ID: {writer.experiment_id}")
    writer.set_terminated()
    return writer


def search(run_id):
    searcher = MLflowSearcher()
    logger.info("run id searching...")
    searcher.search_model_by_run_id(run_id)

    # logger.info("tag searching...")
    # searcher.search_model_by_tag("system_name", "system1")

    logger.info("metrics...")
    searcher.get_metric_history(run_id, ["rmse", "r2", "mae"])


if __name__ == "__main__":
    logger.info("run start")
    writer = train()
    search(writer.run_id)

実行環境立ち上げ

# 起動
docker-compose up -d

# mlflow実行用コンテナにアクセス
docker-compose exec app /bin/bash

学習実行

python run_mlflow.py

結果

http://localhost:9001 にアクセスしてコンソールから minio バケットにモデルが保存されていることが確認できます。
ユーザー名、パスワードはそれぞれ minio, miniopass としています。
image.png

メモ

minio は s3互換のストレージなので、awsで実装する際のローカル環境でのテストにも使えます。

参考

以下を参考にさせていただきました.

5
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?