はじめに
以下の記事で、Databricks のマネージド型MLflowを使ってモデルのトレーニング、ライフサイクル管理を行いました。
DatabricksでMLflowを使う① - ノートブック上での実験トラッキング -
DatabricksでMLflowを使う② - 実験パラメータとメトリクスの可視化 -
DatabricksでMLflowを使う③ - モデルのライフサイクル管理 -
今回はトレーニングとステージングを行ったモデルを別のノートブックから読み込みたいと思います。
イメージとしてはトレーニングしたモデルを Pyspark ユーザー定義関数として読み込み、 pyspark のデータフレームに対して分散処理をかけるという流れになります。
セットアップ
呼び出したいモデルに対して"Run ID"を読み込みます。
# run_id = "<run-id>"
run_id = "d35dff588112486fa1684f38******"
model_uri = "runs:/" + run_id + "/model"
scikit-learn モデルをロードする
実験済みのトレーニングモデルを MLflow API を利用してロードします。
import mlflow.sklearn
model = mlflow.sklearn.load_model(model_uri=model_uri)
model.coef_
次にトレーニングにも利用した糖尿病データセットを読み込んで"progression"のカラムを落とします。
そして、読み込んだ pandas データフレームを pyspark データフレームに変換しておきます。
# Import various libraries including sklearn, mlflow, numpy, pandas
from sklearn import datasets
import numpy as np
import pandas as pd
# Load Diabetes datasets
diabetes = datasets.load_diabetes()
X = diabetes.data
y = diabetes.target
# Create pandas DataFrame for sklearn ElasticNet linear_model
Y = np.array([y]).transpose()
d = np.concatenate((X, Y), axis=1)
cols = ['age', 'sex', 'bmi', 'bp', 's1', 's2', 's3', 's4', 's5', 's6', 'progression']
data = pd.DataFrame(d, columns=cols)
dataframe = spark.createDataFrame(data.drop(["progression"], axis=1))
MLflow モデルの呼び出し
MLflow API を使って Pyspark ユーザー定義関数としてトレーニング済みのモデルを呼び出します。
import mlflow.pyfunc
pyfunc_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)
ユーザー定義関数を使って予測を行います。
predicted_df = dataframe.withColumn("prediction", pyfunc_udf('age', 'sex', 'bmi', 'bp', 's1', 's2', 's3', 's4', 's5', 's6'))
display(predicted_df)
Pyspark モデルを使って分散処理することができました。
おわりに
今回はトレーニング済みのモデルを MLflow API を使って呼び出し、Pyspark で分散処理させることができました。
Databricks では日々新しい機能がアップデートされており、どんどん使いやすくなっています。
これからも新しい機能がでたら追いかけていきたいと思います。