はじめに
DatabricksでMLflowを使ったことが無かったため、勉強してみました。
KaggleのTitanic - Machine Learning from Disasterで使用されるデータを使っています。
尚、目的がMLflowの勉強ですので、データの前処理や分析、特徴量設計は詳細には実施しておりません。
準備
Databricksでカタログ、スキーマ、ボリュームを作成し、データをアップロードします。
まずは、ノートブック上で実行してみます。
(AutoMLを使った場合は、また別途書きます。)
MLflowをインストールします。
%pip install mlflow
そして、必要な条件を設定しておきます。
import mlflow
import pandas as pd
import numpy as np
mlflow.set_registry_uri("databricks-uc")
CATALOG_NAME = "<カタログ名>"
SCHEMA_NAME = "<スキーマ名>"
データの読み込み
pandas Dataframeとして読み込みます。
train_df = pd.read_csv("/Volumes/<カタログ名>/<スキーマ名>/titanic/train.csv")
test_df = pd.read_csv("/Volumes/<カタログ名>/<スキーマ名>/titanic/test.csv")
簡易データ分析
簡単に、データの内容を見てみます。
例えば、男性と女性について分けてみると、女性の方が、生き残っている割合が高いことが分かります。
women = train_df.loc[train_df.Sex == 'female']["Survived"]
men = train_df.loc[train_df.Sex == 'male']["Survived"]
rate_women = sum(women)/len(women)
rate_men = sum(men)/len(men)
print("% of women who survived:", rate_women)
print("% of men who survived:", rate_men)
機械学習
ここから機械学習に入っていきます。
今回は、ランダムフォレストとxgboostを使って機械学習を行います。
学習・検証・テストデータの作成
機械学習の前にデータを準備します。
from sklearn.model_selection import train_test_split
y = train_df.Survived
features = ["Pclass", "Sex", "SibSp", "Parch"]
X = pd.get_dummies(train_df[features])
X_train, X_rem, y_train, y_rem = train_test_split(X, y, train_size=0.6, random_state=123)
X_val, X_test, y_val, y_test = train_test_split(X_rem, y_rem, test_size=0.5, random_state=123)
ランダムフォレストでの実行
いよいよ機械学習に入っていきます。
以下のコードを実行します。
mlflow.start_runで、このモデルのパフォーマンスを追跡するために新しいMLflowランを作成します。
また、mlflow.log_paramを呼び出して使用されるパラメータを追跡し、mlflow.log_metricを使用して正確さなどのメトリクスを記録します。
import mlflow.pyfunc
import mlflow.sklearn
import sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import roc_auc_score
from mlflow.models.signature import infer_signature
from mlflow.utils.environment import _mlflow_conda_env
import cloudpickle
import time
class SklearnModelWrapper(mlflow.pyfunc.PythonModel):
def __init__(self, model):
self.model = model
def predict(self, context, model_input):
return self.model.predict(model_input)
with mlflow.start_run(run_name='titanic_mlflow'):
n_estimators = 10
model = RandomForestClassifier(n_estimators=n_estimators, random_state=np.random.RandomState(123))
model.fit(X_train, y_train)
predictions_test = model.predict(X_test)
auc_score = roc_auc_score(y_test, predictions_test)
mlflow.log_param('n_estimators', n_estimators)
mlflow.log_metric('auc', auc_score)
wrappedModel = SklearnModelWrapper(model)
signature = infer_signature(X_train, wrappedModel.predict(None, X_train))
mlflow.pyfunc.log_model("random_forest_model", python_model=wrappedModel, signature=signature)
実行後、「エクスペリメント」にこの機械学習モデルが登録されます。
Unity Catalogへのモデル登録
Unity Catalogにモデルを登録します。
run_id = mlflow.search_runs(filter_string='tags.mlflow.runName = "titanic_mlflow"').iloc[0].run_id
model_name = f"{CATALOG_NAME}.{SCHEMA_NAME}.titanic_mlflow"
model_version = mlflow.register_model(f"runs:/{run_id}/titanic_mlflow", model_name)
今のモデルにChampionのタグを付けておきます。
from mlflow.tracking import MlflowClient
client = MlflowClient()
client.set_registered_model_alias(model_name, "Champion", model_version.version)
今作ったモデルをロードし、このモデルの学習及び検証データでの精度を確認します。
model = mlflow.pyfunc.load_model(f"models:/{model_name}@Champion")
print(f'学習データ AUC: {roc_auc_score(y_test, model.predict(X_test))}')
print(f'検証データ AUC: {roc_auc_score(y_val, model.predict(X_val))}')
xgboostでの実行
次に、xgboostを使用します。
以下を実行します。
%pip install xgboost
ハイパーパラメータの最適化の為に、hyperoptを利用します。
%pip install hyperopt
そして、以下のコードを実施します。
mlflow.xgboost.autolog()を使用すると、ハイパーパラメータと訓練済みモデルが自動的にMLflowに記録されます。
また、バリデーションセットを渡すことで、xgbは評価指標を追跡できます。XGBoostは評価指標が改善しなくなった時点でトレーニングを終了します。
そして、SparkTrialsを用いて高速化をしています。
最後に、"xgboost_models"という親ランの子ランとして、各ハイパーパラメータの設定がログに記録されるようにfminを実行します。
from hyperopt import fmin, tpe, hp, Trials, STATUS_OK, SparkTrials
from hyperopt.pyll import scope
from math import exp
import mlflow.xgboost
import numpy as np
import xgboost as xgb
search_space = {
'max_depth': scope.int(hp.quniform('max_depth', 4, 100, 1)),
'learning_rate': hp.loguniform('learning_rate', -3, 0),
'reg_alpha': hp.loguniform('reg_alpha', -5, -1),
'reg_lambda': hp.loguniform('reg_lambda', -6, -1),
'min_child_weight': hp.loguniform('min_child_weight', -1, 3),
'objective': 'binary:logistic',
'seed': 123,
}
def train_model(params):
mlflow.xgboost.autolog()
with mlflow.start_run(nested=True):
train = xgb.DMatrix(data=X_train, label=y_train)
validation = xgb.DMatrix(data=X_val, label=y_val)
booster = xgb.train(params=params, dtrain=train, num_boost_round=1000,\
evals=[(validation, "validation")], early_stopping_rounds=50)
validation_predictions = booster.predict(validation)
auc_score = roc_auc_score(y_val, validation_predictions)
mlflow.log_metric('auc', auc_score)
signature = infer_signature(X_train, booster.predict(train))
mlflow.xgboost.log_model(booster, "model", signature=signature)
return {'status': STATUS_OK, 'loss': -1*auc_score, 'booster': booster.attributes()}
spark_trials = SparkTrials(parallelism=10)
with mlflow.start_run(run_name='titanic_mlflow_xgboost'):
best_params = fmin(
fn=train_model,
space=search_space,
algo=tpe.suggest,
max_evals=96,
trials=spark_trials,
)
best_run = mlflow.search_runs(order_by=['metrics.auc DESC']).iloc[0]
print(f'AUC of Best Run: {best_run["metrics.auc"]}')
改めて、Championのエイリアスを振りなおします。
new_model_version = mlflow.register_model(f"runs:/{best_run.run_id}/model", model_name)
client.set_registered_model_alias(model_name, "Champion", new_model_version.version)
このモデルは、以下のようにUnity Catalogに登録されています。
また、パラメータの情報も残されています。
最後に
始めてMLflowに踏み込みましたが、まだまだ色々なことができそうだと感じました。
MLflowのドキュメントを読みさらに勉強していきます!