pythonライブラリのmlflowを使って、scikit-learnモデルの再学習・精度評価・運用モデル更新を行う仕組みを作った時の作業メモです。
作りたい環境の大まかな要件
モデルの本番運用を開始した後、定期的に新規データで再学習を行い運用モデルを更新していく仕組みです。
手短に箇条書きすると、こんなイメージです。
- モデル学習
- 定期的に最新の学習データを使用してモデルを作成し、リポジトリに新規バージョンとして登録
- 毎週/毎月などの定期的なバッチ処理を想定
- モデル精度評価
- リポジトリ内のモデルの各バージョンに対して、評価対象データに対する予測実行を行い予測精度を算出
- これも定期的なバッチ処理を想定
- 精度評価結果はUI上で人が確認する
- モデル更新
- 自動更新の場合:定期的な再学習で生成された最新バージョンを運用バージョン(production)として更新登録
- 手動更新の場合:最新の精度評価結果をUI上で人が確認し、最良と判断した特定のバージョンを運用バージョン(production)として更新登録
想定するユースケース例:
- 需要予測 : 毎週の需要を過去データ(時系列)から予測。毎月、最新2年間のデータで再学習を実行し、生成された最新バージョンを運用モデルとして即時反映する(自動モデル更新)
- 製造ラインでの良品判定 : 検査結果の合否を製造条件から予測。毎週、最新3ヶ月のデータで再学習を実行するが、運用モデルは製造責任者がモデル精度比較結果を見ながら選択する(手動モデル更新)
環境
- CentOS 8.2.2004 (Core)
- Anaconda3 (2020.11-Linux-X84_64)
- Python 3.8.3
- scikit-learn 0.23.1
- sqlite 3.32.3
- mlflow 1.13.1
準備
Anaconda導入
今回はJupyterLabでのモデル開発を手軽にやるため、Anacondaを入れます。他のPython環境構築方法が良い方はお好みでどうぞ。
# yumをアップデート
sudo yum update
# gitの導入 (pyenv導入のため)
sudu yum install -y git
#pyenvの導入
git clone git://github.com/yyuu/pyenv.git ~/.pyenv
echo 'export PYENV_ROOT="$HOME/.pyenv"' >> ~/.bash_profile
echo 'export PATH="$PYENV_ROOT/bin:$PATH"' >> ~/.bash_profile
echo 'eval "$(pyenv init -)"' >> ~/.bash_profile
source ~/.bash_profile
# インストールしたいAnacondaバージョンを確認
pyenv install -l | grep anaconda
# Anacondaの導入
pyenv install anaconda3-2020.07
# 導入されたAnacondaバージョンの確認
pyenv versions
# python環境を導入したAnaconda環境に切り替え
pyenv global anaconda3-2020.07
mlflowの導入
mlflowはpipで導入します。 同時に前提となるgunicornやflaskなども導入されます。
pip install mlflow
sqlite3の導入
今回はmlflowトラッキングサーバーのbackend storeとして最も簡単にセットアップできるsqlite3を使いますが、既に導入したanaconda3-2020.07に含まれているため、追加手順は不要です。
もし別の方法でpython環境をセットアップした場合は、pipでsqlite3を導入します。
pip install sqlite3
mlflowトラッキングサーバーの起動
mlflowでは2つのストレージ領域を使用します。
- Backend Store : "Models"でバージョン管理されるモデルの格納領域。SQLAlchemy database URI形式でアクセス可能なデータベースを使用する必要がある。
- Artifacts Store : "Experiments"で管理される実験(モデル学習や評価)の履歴の格納領域
両者の詳細はmlflowのドキュメントを参照してください。
https://mlflow.org/docs/latest/tracking.html#backend-stores
https://mlflow.org/docs/latest/tracking.html#artifact-stores
今回はBackend Storeとして簡単に用意できるsqlite3を、Artifacts Storeはローカルディレクトリを使用します。
# backend storeとartifacts用のディレクトリの作成(オプション)
sudo mkdir /mnt/share
sudo chmod 777 /mnt/share
# mlflowトラッキングサーバーの起動
mlflow server --backend-store-uri sqlite:////mnt/share/mlflow.db --default-artifact-root /mnt/share/mlflow_artifacts --host 0.0.0.0 --port 5000
(option) Firewall設定
外部からmlflowトラッキングサーバーのURL(上記の場合 http://IPaddress:5000 )にアクセスする場合は、Firewallを適切します。以下はテスト用途でよくやるFirewallの一括停止です。
# firewalldの停止
systemctl stop firewalld
# firewalldの自動起動の停止
systemctl disable firewalld
mlflowトラッキングサーバーへのアクセス
Webブラウザで起動したトラッキングサーバー http://IPaddress:5000 へアクセスして、mlflow画面が表示されることを確認します。ExperimentsもModelsもまだ空の状態です。
モデル学習
ここからはpythonでmlflowを操ります。python開発環境はAnacondaで導入されたJupyterLabを使います。JupyterLab周りの解説は省略します。
- データはscikit-learnのirisを使用
- 種別毎にモデルを分割することを想定し、3つのモデルタイプを定義 (model_types)
- 学習した後に、mlflowで情報をトラッキングサーバーに記録
ライブラリのインポートと定数定義
# ライブラリインポート
import pandas as pd
import numpy as np
import copy
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import confusion_matrix, accuracy_score, classification_report
import mlflow
train_result_file_nameは、モデル学習結果を一時的なテキストファイルとして出力し、それをArtifactsとして登録するために使用しています。
# 各モデル学習の詳細情報の一時出力先ファイル名
train_result_file_name = '/tmp/train_result.txt'
# mlflow tracking server URL
mlflow_tracking_server_url = 'http://localhost:5000'
mlflow.set_tracking_uri(mlflow_tracking_server_url)
以下はモデル自動更新を行う場合に必要です。注意として、上記のset_tracking_uriの後に実行するか、引数としてtracking_uri='http://localhost:5000'
を指定する必要があります。そうでない場合、不明なKeyErrorが発生してハマります。
# mlflow tracking serverのクライアントインスタンス(Productionバージョン操作用)
client = mlflow.tracking.MlflowClient()
#client = mlflow.tracking.MlflowClient(tracking_uri=mlflow_tracking_server_url)
##モデルタイプ・目的変数・説明変数の定義
# 学習データテーブルでのモデルタイプ列名 ★★★ 大文字小文字の区別は要検討 ★★★
model_types_column_name = 'mtype'
# モデルタイプ
model_types = [
{'name': 'type1', 'detail': 'モデルパターンの詳細説明1'},
{'name': 'type2', 'detail': 'モデルパターンの詳細説明2'},
{'name': 'type3', 'detail': 'モデルパターンの詳細説明3'}
]
# 目的変数列名
target_val_colmun_name = 'target'
# 説明変数列名
feature_val_column_names = [
'sepal length (cm)',
'sepal width (cm)',
'petal length (cm)',
'petal width (cm)'
]
学習データの読み込み
# サンプルデータとしてirisを使う。合計150件
from sklearn.datasets import load_iris
iris = load_iris()
df = pd.DataFrame(iris.data, columns=iris.feature_names)
df['target'] = iris.target_names[iris.target]
# モデルタイプ列は乱数で生成する
import random
random.seed(1)
mtype_list = []
for i in range(len(df)):
mtype_list.append(model_types[random.randint(0,len(model_types)-1)]['name'])
df[model_types_column_name] = mtype_list
JupyterLab上でdf.head()
で確認すると、以下のデータが確認できます。
モデル学習
モデルタイプ毎にループを回して、3つのモデルを作成しています。なお、定期的な再学習を想定しているため、testとtrainのsplitはせず、全データを学習用として使用します。学習後には、mlflowによるExperimentの記録とモデル登録も行っています。
# 現在日付の取得 (Experimentsでのrunの名前に使用)
from pytz import timezone
import datetime
now_datetime = datetime.datetime.now(timezone('Asia/Tokyo')).strftime('%Y%m%d_%H%M%S')
now_date = now_datetime[0:8]
for model_type in model_types:
# モデルタイプ毎のデータの切り出し
df_mtype = df[df[model_types_column_name] == model_type['name']]
# 説明変数・目的変数の分割
X = df_mtype[feature_val_column_names]
y = df_mtype[target_val_colmun_name]
# モデル学習
model = RandomForestClassifier(max_depth=2, random_state=5, n_estimators=10)
model.fit(X, y)
# 学習データに対する精度(参考値)
predicted = model.predict(X)
ac_score = accuracy_score(y, predicted)
con_matrix = confusion_matrix(y, predicted)
clf_report = classification_report(y, predicted)
# 学習結果のmlflowへの登録
mlflow.set_experiment('モデル学習履歴')
mlflow_run_name = now_date+'学習結果_' + model_type['name']
with mlflow.start_run(run_name=mlflow_run_name) as run:
# パラメータとしてrunへ記録
mlflow.log_param('モデルタイプ名', model_type['name'])
mlflow.log_param('学習データ件数', len(X))
mlflow.log_param('正解率_学習データ',ac_score) # log_metricは日本語が使えないためparamに含める
# モデル学習結果の詳細をファイルに出力し、runのartifactに登録
with open(train_result_file_name, 'w') as f:
print('モデルタイプ名 :', model_type['name'], file=f)
print('目的変数 :', target_val_colmun_name, file=f)
print('説明変数 :', feature_val_column_names, file=f)
print('学習データ件数 :', len(X), '\n', file=f)
print('モデル精度(学習データ)', '\n', '------------------------------------------------', file=f)
print('混同行列(confusion_matrix) :', '\n', confusion_matrix(y,predicted), file=f)
print('目的変数ラベル', np.sort(y.unique()), '\n', file=f)
print('正解率(accuracy) :', '\n', ac_score, '\n', file=f)
print('精度レポート(classification_report) :', '\n', clf_report, '\n', file=f)
mlflow.log_artifact(train_result_file_name)
# モデルをrunへ登録
mlflow.sklearn.log_model(sk_model=model, artifact_path='model')
# モデルをModelsへ登録(Register model)
model_uri = 'runs:/{}/model'.format(run.info.run_id)
reg_model = mlflow.register_model(model_uri, model_type['name'])
# Productionモデルの更新モード (auto : 自動更新(再学習モデルを即時反映) / manual : 手動更新(責任者が画面上で操作))
#---------------------------------------------------------------#
# 本番用途では、この値をDBなどから取得。今回は暫定的にハードコーディングする。
#---------------------------------------------------------------#
model_update_mode = 'manual'
# モデルバージョンが1の場合は必ずProductionとして登録する
if reg_model.version == '1':
client.transition_model_version_stage(
name = model_type['name'],
version = reg_model.version,
stage = "Production",
archive_existing_versions = True
)
else:
# モデル更新モード:自動
if model_update_mode == 'auto':
client.transition_model_version_stage(
name = model_type['name'],
version = reg_model.version,
stage = "Production",
archive_existing_versions = True
)
学習結果(1回目)
学習のコードを1回実行した後の状態です。mlflow画面のExperimentsに学習結果が記録されています。
(解説)
- Experiments名は
mlflow.set_experiment
で指定した「モデル学習履歴」 - 3つのrunが登録されている。Run名は
mlflow.start_run()
のrun_name
オプションで指定した文字列 "YYYYMMDD学習結果_モデルタイプ名" - Parametersは
mlflow.log_param
で指定した「モデルタイプ名」「学習データ件数」「正解率_学習データ」が表示されている。 - ちなみに「正解率」のようなものは、本来は
mlflow.log_metric
で記録するべきもの。mlflow.log_metric
では日本語を指定できなかったので敢えてlog_param
で記録している。
こちらは、モデル学習履歴のStartTimeのリンクをクリックした画面です。
Artifactsの部分に学習結果であるモデルが表示されています。今回のトラッキングサーバーのArtifacts Storeはローカルディレクトリを使用しており、その格納場所は/mnt/share/mlflow_artifacts/1/950d96375b044a2383cde334ff86534b/artifacts/model
であることが分かります。Make Predictionsの部分に、モデルの呼び出しと予測実行のサンプルコードも見えます。
Artifactsには、mlflow.log_artifact
で自由にファイルを登録することができます。今回は学習モデルの評価結果をテキストファイル化したものを登録しています。
こちらはModelsの画面です。mlflow.register_model
によって、モデルタイプtype1〜3の学習済モデルがVersion1として登録されていることが分かります。さらにclient.transition_model_version_stage
によって、登録されたモデルがバージョン1の場合は初期モデルとしてProductionとしても登録しています。これは運用途中でモデルタイプが増えた時のことを想定しています。
ちなみにmlflowには、Production以外にStagingというラベルも存在していますが、今回は使用していません。このラベルを自由に定義できるようになってくれると良いのですが、現時点(2021年1月)のmlflowではそのような機能はないようです。
学習結果(2回目)
学習のコードをもう1回実行した後のExperimentsの状態です。
モデル学習履歴に新たに3つのrunが登録されています。
Modelsの画面では、Latest VersionとしてVersion 2が登録されています。モデル自動更新は行わない手動更新を想定しているので、ProductionはVersion 1のままです。
モデル精度評価
最新の評価データに対する各モデルの予測精度を比較するため、評価用のモデル実行を行いExperimentsに記録します。モデル再学習よりは短いサイクルで定期バッチ実行、あるいは手動でバッチ実行することを想定しています。
以下のパーツはモデル学習と同じです。評価用データは本来は最新データとすべきですが、今回は学習用と同じデータを使っています。
# ライブラリのインポート
import pandas as pd
import numpy as np
import copy
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import confusion_matrix, accuracy_score, classification_report
import mlflow
# 各モデル評価の詳細情報の一時出力先ファイル名
eval_result_file_name = '/tmp/eval_result.txt'
# mlflow tracking server URL
mlflow_tracking_server_url = 'http://localhost:5000'
mlflow.set_tracking_uri(mlflow_tracking_server_url)
# mlflow tracking serverのクライアントインスタンス(Productionバージョン操作用)
# 以下はset_tracking_uriの後に実行するか、tracking_uri='http://localhost:5000'指定が必要。そうでない場合、不明なKeyErrorが発生する
client = mlflow.tracking.MlflowClient()
#client = mlflow.tracking.MlflowClient(tracking_uri=mlflow_tracking_server_url)
学習データテーブルでのモデルタイプ列名 ★★★ 大文字小文字の区別は要検討 ★★★
model_types_column_name = 'mtype'
# モデルタイプ
model_types = [
{'name': 'type1', 'detail': 'モデルパターンの詳細説明1'},
{'name': 'type2', 'detail': 'モデルパターンの詳細説明2'},
{'name': 'type3', 'detail': 'モデルパターンの詳細説明3'}
]
# 目的変数列名
target_val_colmun_name = 'target'
# 説明変数列名
feature_val_column_names = [
'sepal length (cm)',
'sepal width (cm)',
'petal length (cm)',
'petal width (cm)'
]
# サンプルデータとしてirisを使う。合計150件
from sklearn.datasets import load_iris
iris = load_iris()
df = pd.DataFrame(iris.data, columns=iris.feature_names)
df['target'] = iris.target_names[iris.target]
# モデルタイプ列は乱数で生成する
import random
random.seed(1)
mtype_list = []
for i in range(len(df)):
mtype_list.append(model_types[random.randint(0,len(model_types)-1)]['name'])
df[model_types_column_name] = mtype_list
# 現在日付の取得 (Experimentsでのrunの名前に使用)
from pytz import timezone
import datetime
now_datetime = datetime.datetime.now(timezone('Asia/Tokyo')).strftime('%Y%m%d_%H%M%S')
now_date = now_datetime[0:8]
ここから、評価用のモデル実行と精度測定です。まず評価対象のモデルを呼び出して辞書型変数に格納します。今回は簡素化のため、全モデルタイプの全バージョンを評価対象としています。
# 全モデルタイプの全バージョンを取得
loaded_models = {}
for model_type in model_types:
loaded_models_ver = {}
for i in range(1,100):
try:
loaded_models_ver['Version'+str(i)] = mlflow.sklearn.load_model('models:/'+model_type['name']+'/'+str(i))
except Exception as e:
loaded_models_ver['Version'+str(i)] = 'NONE'
loaded_models_ver['latest_version'] = i-1
break
loaded_models[model_type['name']] = loaded_models_ver
なお、上のコードでイマイチな箇所が1つあります。内側のfor文でバージョン番号を1〜100固定で回している部分です。
いろいろAPIを調べたのですが、モデルの最新バージョンの番号を取得する方法がうまく見つからず、しょうがなくバージョン番号を1つずつカウントアップしてモデルロードが失敗した時点の1つ前が最新バージョン、という判別のしかたにしています。
Modelsに登録されたモデルの最新バージョン番号を取得するスマートな方法をご存じの方がいたら、ぜひ教えて下さい。
(2020/1/5追記) MlflowClient()
を使ってsearch_model_versions("name='<name>'")
で探す方法を教えてもらいました。これでスマートにバージョン番号のリストを入手できそうです。後ほど検証できたら記事を更新します。
この時点のloaded_modelsをdisplayすると、以下のような状態になっています。
ここからモデルの呼び出しと予測実行および結果のmlflowによる記録です。学習時とかなり似ていますが、使用するモデルを上で呼び出したモデルに順次差し替えて予測実行しています。
# evaluation
for model_type in model_types:
# モデルタイプ毎のデータの切り出し
df_mtype = df[df[model_types_column_name] == model_type['name']]
# 説明変数・目的変数の分割
X = df_mtype[feature_val_column_names]
y = df_mtype[target_val_colmun_name]
# 評価結果のmlflowへの登録
mlflow_experiment_name = now_date+'評価_' + model_type['name']
mlflow.set_experiment(mlflow_experiment_name)
for i in range(1,100):
model = loaded_models[model_type['name']]['Version'+str(i)]
if model == 'NONE': break
# 評価データに対する精度
predicted = model.predict(X)
ac_score = accuracy_score(y, predicted)
con_matrix = confusion_matrix(y, predicted)
clf_report = classification_report(y, predicted)
mlflow_run_name = 'Version'+str(i)
with mlflow.start_run(run_name=mlflow_run_name) as run:
# パラメータとしてrunへ記録
mlflow.log_param('Version', i)
mlflow.log_param('モデルタイプ名', model_type['name'])
mlflow.log_param('評価データ件数', len(X))
mlflow.log_metric('accuracy',ac_score) # log_metricは日本語が使えないが比較をするためmetricにする
# モデル学習結果の詳細をファイルに出力し、runのartifactに登録
with open(eval_result_file_name, 'w') as f:
print('モデルタイプ名 :', model_type['name'], file=f)
print('モデルバージョン :', 'Version'+str(i), file=f)
print('目的変数 :', target_val_colmun_name, file=f)
print('説明変数 :', feature_val_column_names, file=f)
print('評価データ件数 :', len(X), '\n', file=f)
print('モデル精度(評価データ)', '\n', '------------------------------------------------', file=f)
print('混同行列(confusion_matrix) :', '\n', confusion_matrix(y,predicted), file=f)
print('目的変数ラベル', np.sort(y.unique()), '\n', file=f)
print('正解率(accuracy) :', '\n', ac_score, '\n', file=f)
print('精度レポート(classification_report) :', '\n', clf_report, '\n', file=f)
mlflow.log_artifact(eval_result_file_name)
評価結果
評価のコードを実行した後のmlflowのExperiments画面です。Experiments名を評価日+モデルタイプとしています。Runはモデルバージョン毎に記録されています。
ここで、比較したいRunにチェックを入れて"Compare"ボタンをクリックすると、
モデルのバージョン比較画面が表示されます。モデル運用の担当者がこの画面を見て、複数のバージョンの精度を確認するという想定です。
さらにaccruracyのリンクをクリックすると、以下のようにグラフで比較画面も表示されます。今回作成したモデルはVersion1も2も同じ学習データだったため、全く同じaccuracyとなっていますが、本番運用のシーンではここで視覚的に各バージョンの精度を比較できます。
運用モデル更新
自動更新方式にする場合は、学習のコードの最後でclient.transition_model_version_stage
を実行し生成モデルをProductionとして更新するようにすればOKです。
以下は、手動更新方式の場合です。
モデル評価結果をモデル運用担当者がmlflow画面上で確認した後、mlflow画面の手動操作で運用中のモデル(Production)を更新します。
Models画面でProductionにしたいモデルのバージョンを開き、右上の"Stage"プルダウンメニューから"Transition to Production"を選択します。
ポップアップが出るのでそのままOKをクリックします。
ちなみにArchivedは不要を意味する単なるラベル名で、すぐにモデルが削除される事はありません。モデルの削除はclient.delete_model_version
を使います。
Modelsの一覧画面を見ると、Version 2がProductionに変わったことを確認できます。
(補足) モデルのOnline実行
Modelsに登録されたProductionのモデルは、mlflow models serve
コマンドでサービスを起動することで、いわゆるOnline型のモデル実行環境が立ち上がります。
注意事項として、実行する前に環境変数MLFLOW_TRACKING_URI
でトラッキングサーバーを指定しておく必要があります。
https://mlflow.org/docs/latest/cli.html#mlflow-models
# 環境変数でトラッキングサーバーを指定
export MLFLOW_TRACKING_URI=http://localhost:5000
# モデル名 type1 のProductionバージョンを実行する
mlflow models serve -m models:/type1/Production -h 0.0.0.0 -p 7001
これにより、type1のProductionモデルは http://IPaddress:port/Invocations というURLで公開されます。ここにREST ClientからJSONで予測の入力データをPOSTすると、予測結果がレスポンスとして得られます。
REST Client (Insomnia) からのモデル予測実行例:
入力となるJSONデータは、pandasデータフレームのto_json
でorient='split'
とした形式が使用可能です。
例:学習データの1レコードをJSON化
X.head(1).to_json(orient='split',index=False)
# {"columns":["sepal length (cm)","sepal width (cm)","petal length (cm)","petal width (cm)"],"data":[[4.9,3.0,1.4,0.2]]}
以上。