この記事はなに?
postgresql + minio を使って mlflow を使ってモデルをロギングする環境をDockerを使って構築したときのメモです。
環境
python 3.9.16
mlflow 2.2.1
レポジトリ
以下で公開しています
使用ファイル
Dockerfile
# # our base image
FROM python:3.9.16-buster
# install Python modules needed by the Python app
COPY requirements.txt /app/
RUN pip install -U pip && \
pip install --no-cache-dir -r /app/requirements.txt
WORKDIR /app
docker-compose.yml
version: "3"
services:
app:
build:
context: ./
dockerfile: Dockerfile
volumes:
- ./mlflow:/app
env_file:
- ./.env
environment:
- DB_HOSTNAME=${DB_HOSTNAME}
- DB_PORT=${DB_PORT}
- DB_PASSWORD=${DB_PASSWORD}
- DB_NAME=${DB_NAME}
- DB_USERNAME=${DB_USERNAME}
- MLFLOW_S3_ENDPOINT_URL=http://minio:9000
- AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID}
- AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY}
tty: true
init: true
depends_on:
- minio
- postgres
postgres:
image: postgres
ports:
- 5432:5432
env_file:
- ./.env
environment:
- POSTGRES_PASSWORD=${DB_PASSWORD}
- POSTGRES_DB=${DB_NAME}
- DB_USERNAME=${DB_USERNAME}
- POSTGRES_INITDB_ARGS="--encoding=UTF-8"
volumes:
- postgres-db:/var/lib/postgresql/data
- ./postgresql:/docker-entrypoint-initdb.d
restart: always
user: root
# DB確認用
pgadmin4:
image: dpage/pgadmin4:6
#container_name: pgadmin4
ports:
- 8002:80
volumes:
- pgadmin4_volume:/var/lib/pgadmin
env_file:
- ./.env
environment:
- PGADMIN_DEFAULT_EMAIL=${PGADMIN_DEFAULT_EMAIL}
- PGADMIN_DEFAULT_PASSWORD=${PGADMIN_DEFAULT_PASSWORD}
depends_on:
- postgres
restart: always
# S3互換のストレージ
minio:
image: quay.io/minio/minio
restart: unless-stopped
volumes:
- ./minio:/data
ports:
- 9000:9000
- 9001:9001 # ブラウザ確認用
environment:
MINIO_ROOT_USER: ${MINIO_ROOT_USER}
MINIO_ROOT_PASSWORD: ${MINIO_ROOT_PASSWORD}
command: server /data --console-address ":9001"
# minioコンテナ起動時にデフォルトのバケットを自動作成する
defaultbucket:
image: minio/mc
depends_on:
- minio
entrypoint: >
/bin/sh -c "
until (/usr/bin/mc config host add minio http://minio:9000 ${MINIO_ROOT_USER} ${MINIO_ROOT_PASSWORD}) do echo 'try to create buckets...' && sleep 1; done;
/usr/bin/mc mb ${MINIO_MLFLOW_BUCKET};
/usr/bin/mc policy download ${MINIO_MLFLOW_BUCKET};
exit 0;
"
volumes:
postgres-db:
pgadmin4_volume:
実行スクリプト
accessor.py
アクセス用、検索用のクラスを定義して使用しています。
from pprint import pprint
import mlflow
from mlflow.tracking import MlflowClient
class MlflowWriter:
def __init__(self, experiment_name, artifact_location=None, **kwargs):
"""mlflow書き込みを行うクラス
Args:
experiment_name (str): 実験名
artifact_location (str, optional): artifactの保存先. ex: s3://<bucket_name>/artifacts Defaults to None.
"""
self.client = MlflowClient(**kwargs)
try:
self.experiment_id = self.client.create_experiment(
experiment_name,
artifact_location=artifact_location,
)
except:
self.experiment_id = self.client.get_experiment_by_name(experiment_name).experiment_id
self.run_id = self.client.create_run(self.experiment_id).info.run_id
self.artifact_path = "model"
def log_params_from_omegaconf_dict(self, params):
for param_name, element in params.items():
self._explore_recursive(param_name, element)
def _explore_recursive(self, parent_name, element):
if isinstance(element, DictConfig):
for k, v in element.items():
if isinstance(v, DictConfig) or isinstance(v, ListConfig):
self._explore_recursive(f"{parent_name}.{k}", v)
else:
self.client.log_param(self.run_id, f"{parent_name}.{k}", v)
elif isinstance(element, ListConfig):
for i, v in enumerate(element):
self.client.log_param(self.run_id, f"{parent_name}.{i}", v)
def log_torch_model(self, model, registered_model_name):
"""Logs a torch model"""
with mlflow.start_run(self.run_id):
mlflow.pytorch.log_model(model, self.artifact_path, registered_model_name=registered_model_name)
def log_torch_state_dict(self, model):
with mlflow.start_run(self.run_id):
mlflow.pytorch.log_state_dict(
model.state_dict(),
artifact_path=self.artifact_path,
)
def load_torch_model(self, model_name, model_version):
return mlflow.pytorch.load_model(model_uri=f"models:/{model_name}/{model_version}")
def log_sklearn_model(self, model, registered_model_name=None):
"""sklearn modelの保存
model: 保存するモデル
registered_model_name: mlflow内部でのモデル名
同名のモデルが保存されるとモデルのバージョンが上がる.
"""
with mlflow.start_run(self.run_id):
mlflow.sklearn.log_model(model, self.artifact_path, registered_model_name=registered_model_name)
def log_param(self, key, value):
"""パラメータのロギング
Args:
key (_type_): _description_
value (_type_): _description_
"""
self.client.log_param(self.run_id, key, value)
def log_metric(self, key, value, step=0):
self.client.log_metric(self.run_id, key, value, step=step)
def log_artifact(self, local_path):
"""アーカイブのロギング
Args:
local_path (str): 保存するファイルのパス
"""
self.client.log_artifact(self.run_id, local_path)
def set_tag(self, key, value):
"""tagをつける"""
self.client.set_tag(self.run_id, key, value)
def set_terminated(self):
"""記録終了"""
self.client.set_terminated(self.run_id)
class MLflowSearcher:
def __init__(self):
"""mlflow検索用クラス
Args:
experiment_name (str): 実験
artifact_location (str, optional): artifactの保存先. ex: s3://<bucket_name>/artifacts Defaults to None.
"""
self.client = mlflow.tracking.MlflowClient()
def search_model_by_run_id(self, run_id):
"""run_idに一致するモデル情報を取得する
Args:
run_id (str): run_id
Returns:
"""
results = self.client.search_model_versions(f'run_id = "{run_id}"')
for res in results:
pprint(res)
def search_model_by_model_name(self, model_name):
results = self.client.search_model_versions(f'name = "{model_name}"')
for res in results:
pprint(res)
def get_metric_history(self, run_id, metric_names):
def print_metric_info(history):
for m in history:
print("name: {}".format(m.key))
print("value: {}".format(m.value))
print("step: {}".format(m.step))
print("timestamp: {}".format(m.timestamp))
print("--")
for name in metric_names:
print_metric_info(self.client.get_metric_history(run_id, name))
main.py
scikit-learnモデルを学習し、精度のロギング、レジストリへの保存を行い、保存した実験結果の検索を実行しています。
精度情報などはpostgresqlに、モデルファイルはminioストレージに保存されます。
import logging
import os
from urllib.parse import urlparse
import numpy as np
import pandas as pd
from accessor import MLflowSearcher, MlflowWriter
from sklearn.linear_model import ElasticNet
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
from sklearn.model_selection import train_test_split
import mlflow
import mlflow.sklearn
from mlflow.store.artifact.runs_artifact_repo import RunsArtifactRepository
DB = os.environ.get("DB_NAME")
USER = os.environ.get("DB_USERNAME")
PASSWORD = os.environ.get("DB_PASSWORD")
HOST = os.environ.get("DB_HOSTNAME")
PORT = os.environ.get("DB_PORT")
# mlflow db 設定
os.environ["MLFLOW_TRACKING_URI"] = f"postgresql+psycopg2://{USER}:{PASSWORD}@{HOST}:{PORT}/{DB}"
logging.basicConfig(level=logging.WARN)
logger = logging.getLogger(__name__)
def eval_metrics(actual, pred):
rmse = np.sqrt(mean_squared_error(actual, pred))
mae = mean_absolute_error(actual, pred)
r2 = r2_score(actual, pred)
return rmse, mae, r2
def train():
# Read the wine-quality csv file from the URL
csv_url = "http://archive.ics.uci.edu/ml/machine-learning-databases/wine-quality/winequality-red.csv"
try:
data = pd.read_csv(csv_url, sep=";")
except Exception as e:
logger.exception(
"Unable to download training & test CSV, check your internet connection. Error: %s",
e,
)
# Split the data into training and test sets. (0.75, 0.25) split.
train, test = train_test_split(data)
# The predicted column is "quality" which is a scalar from [3, 9]
train_x = train.drop(["quality"], axis=1)
test_x = test.drop(["quality"], axis=1)
train_y = train[["quality"]]
test_y = test[["quality"]]
alpha = 0.5
l1_ratio = 0.5
# mlflowで記録
model_name = "model1"
writer = MlflowWriter(experiment_name=f"exp-{model_name}", artifact_location="s3://mlflow/artifacts")
# with mlflow.start_run() as run:
lr = ElasticNet(alpha=alpha, l1_ratio=l1_ratio, random_state=42)
lr.fit(train_x, train_y)
predicted_qualities = lr.predict(test_x)
(rmse, mae, r2) = eval_metrics(test_y, predicted_qualities)
print("Elasticnet model (alpha=%f, l1_ratio=%f):" % (alpha, l1_ratio))
print(" RMSE: %s" % rmse)
print(" MAE: %s" % mae)
print(" R2: %s" % r2)
# パラメータ保存
writer.log_param("alpha", alpha)
writer.log_param("l1_ratio", l1_ratio)
writer.log_metric("rmse", rmse)
writer.log_metric("r2", r2)
writer.log_metric("mae", mae)
writer.set_tag("system_name", system_name)
writer.set_tag("model_name", model_name)
tracking_url_type_store = urlparse(mlflow.get_tracking_uri()).scheme
print("tracking_uri: ", mlflow.get_tracking_uri())
print("tracking_url_type")
# Model registry does not work with file store
# ファイルストアではモデルレジストリは使えない
if tracking_url_type_store != "file":
writer.log_sklearn_model(lr, model_name)
else:
writer.log_sklearn_model(lr, model_name)
# mlflow.sklearn.log_model(lr, "model")
# run_idを登録
logger.info(f"Run ID: {writer.run_id} Exp_ID: {writer.experiment_id}")
writer.set_terminated()
return writer
def search(run_id):
searcher = MLflowSearcher()
logger.info("run id searching...")
searcher.search_model_by_run_id(run_id)
# logger.info("tag searching...")
# searcher.search_model_by_tag("system_name", "system1")
logger.info("metrics...")
searcher.get_metric_history(run_id, ["rmse", "r2", "mae"])
if __name__ == "__main__":
logger.info("run start")
writer = train()
search(writer.run_id)
実行環境立ち上げ
# 起動
docker-compose up -d
# mlflow実行用コンテナにアクセス
docker-compose exec app /bin/bash
学習実行
python run_mlflow.py
結果
http://localhost:9001 にアクセスしてコンソールから minio バケットにモデルが保存されていることが確認できます。
ユーザー名、パスワードはそれぞれ minio, miniopass としています。
メモ
minio は s3互換のストレージなので、awsで実装する際のローカル環境でのテストにも使えます。
参考
以下を参考にさせていただきました.