LoginSignup
4
4

[2024年版] Databricksにおける機械学習モデル構築のエンドツーエンドのサンプル

Posted at

こちらの記事を書いたのが2021年でした。3年前…。

現時点の機能を使って同じようにウォークスルーします。元ネタはこちらのチュートリアルです。

全体の流れ

この例では、ワインの性質に基づいて、ポルトガルの"Vinho Verde"ワインの品質を予測するモデルを構築します。UCI機械学習リポジトリのデータModeling wine preferences by data mining from physicochemical properties [Cortez et al., 2009]を活用します。

  1. データのインポート
  2. Seabornとmatplotlibによるデータの可視化
  3. 機械学習モデルをトレーニングする際に用いるハイパーパラメーター探索を並列で実行
  4. ハイパーパラメーター探索結果をMLflowで確認
  5. Unity Catalogにベストモデルを登録
  6. 登録済みモデルをSpark UDFとして別のデータセットに適用
  7. 低レーテンシーリクエストに対応するためのモデルサービングの実行

要件

このノートブックではDatabricks MLランタイムが必要です。以下のスペックのクラスターを使います。
Screenshot 2024-03-26 at 19.24.49.png

前回との違い

  • モデルをMLflowレジストリではなく、Unity Catalogで管理
  • サーバレスモデルサービングを利用
  • streamlitによるGUI連携
  • その他、GUIの変化など

ライブラリのインストール

MLflowを最新バージョンにして、streamlitもインストールしておきます。

%pip install --upgrade "mlflow-skinny[databricks]"
%pip install streamlit
dbutils.library.restartPython()
import re
from pyspark.sql.types import * 

# Username を取得
username_raw = dbutils.notebook.entry_point.getDbutils().notebook().getContext().tags().apply('user')
# Username の英数字以外を除去し、全て小文字化。Username をファイルパスやデータベース名の一部で使用可能にするため。
username = re.sub('[^A-Za-z0-9]+', '', username_raw).lower()

# ファイル格納パス
work_path = f"dbfs:/tmp/databricks_handson/{username}"

# モデル名
# Unity Catalogに登録するので事前にカタログとスキーマを作成しておきます
model_name = "takaakiyayoi_catalog.wine_db.wine_model"

# パスとモデル名を表示
print(f"path_name: {work_path}")
print(f"model_name: {model_name}")
path_name: dbfs:/tmp/databricks_handson/takaakiyayoidatabrickscom
model_name: takaakiyayoi_catalog.wine_db.wine_model

データのインポート

このセクションでは、サンプルデータからpandasデータフレームにデータを読み込みます。このEDA周りは前回と変わってません。

import pandas as pd

white_wine = pd.read_csv("/dbfs/databricks-datasets/wine-quality/winequality-white.csv", sep=";")
red_wine = pd.read_csv("/dbfs/databricks-datasets/wine-quality/winequality-red.csv", sep=";")

ワインが赤ワインか白ワインかを示す"is_red"カラムを追加して、2つのデータフレームを1つのデータセットにマージします。

red_wine['is_red'] = 1
white_wine['is_red'] = 0

data = pd.concat([red_wine, white_wine], axis=0)

# カラム名から空白を削除
data.rename(columns=lambda x: x.replace(' ', '_'), inplace=True)

データの中身を確認します。Databricksではdisplay関数を用いることで、簡単にデータを可視化することができます。
Screenshot 2024-03-26 at 19.28.09.png

データの可視化

モデルをトレーニングする前に、Seaborn、matplotlibを用いてデータを可視化します。

目的変数のqualityのヒストグラムをプロットします。

import seaborn as sns
sns.distplot(data.quality, kde=False)

download.png

qualityは3から9に正規分布しているように見えます。

quality >= 7のワインを高品質と定義します。

high_quality = (data.quality >= 7).astype(int)
data.quality = high_quality

特徴量と2値ラベルの間の相関を見るにはボックスプロットが有用です。

import matplotlib.pyplot as plt

dims = (3, 4)

f, axes = plt.subplots(dims[0], dims[1], figsize=(25, 15))
axis_i, axis_j = 0, 0
for col in data.columns:
    if col == "is_red" or col == "quality":
        continue  # カテゴリ変数にボックスプロットは使用できません
    sns.boxplot(x=high_quality, y=data[col], ax=axes[axis_i, axis_j])
    axis_j += 1
    if axis_j == dims[1]:
        axis_i += 1
        axis_j = 0

download (1).png

上のボックスプロットから、いくつかの変数がqualityに対する単変量予測子として優れていることがわかります。

  • alcoholのボックスプロットにおいては、高品質ワインのアルコール含有量の中央値は、低品質のワインの75%パーセンタイルよりも大きな値となっています。
  • densityのボックスプロットにおいては、低品質ワインの密度は高品質ワインよりも高い値を示しています。密度は品質と負の相関があります。

データの前処理

モデルのトレーニングの前に、欠損値のチェックを行い、データをトレーニングデータとバリデーションデータに分割します。

data.isna().any()
fixed_acidity           False
volatile_acidity        False
citric_acid             False
residual_sugar          False
chlorides               False
free_sulfur_dioxide     False
total_sulfur_dioxide    False
density                 False
pH                      False
sulphates               False
alcohol                 False
quality                 False
is_red                  False
dtype: bool

欠損値はありませんでした。このようなデータの欠損値や統計は、dbutils.data.summarizedisplay関数の結果からもアクセスすることができます。

dbutils.data.summarize(data)

Screenshot 2024-03-26 at 19.31.09.png

トレーニングデータとテストデータを分割して準備は完了です。また、リネージを捕捉できるようにトレーニング/テストデータはUnity Catalog配下のDeltaテーブルとして保存します。

from sklearn.model_selection import train_test_split
import pyspark.pandas as ps

train, test = train_test_split(data, random_state=123)
X_train = train.drop(["quality"], axis=1)
X_test = test.drop(["quality"], axis=1)
y_train = train.quality
y_test = test.quality

# トレーニング/テストデータはUnity Catalog配下のDeltaテーブルとして保存します
training_data_table = "takaakiyayoi_catalog.wine_db.wine_training"
test_data_table = "takaakiyayoi_catalog.wine_db.wine_test"

ps.from_pandas(train).to_table(training_data_table, mode="overwrite")
ps.from_pandas(test).to_table(test_data_table, mode="overwrite")

# Deltaバージョンの取得
training_temp = spark.sql(f"DESCRIBE history {training_data_table} limit 1").collect()
training_latest_version = int(training_temp[0][0])
test_temp = spark.sql(f"DESCRIBE history {test_data_table} limit 1").collect()
test_latest_version = int(test_temp[0][0])

print("training table version:", training_latest_version)
print("test table version:", test_latest_version)

Deltaテーブルではデータのバージョンが管理されるので、データを更新するとバージョン番号が増加します。この例では、当該テーブルを何度か更新しているのでバージョン4が表示されています。

training table version: 4
test table version: 4

カタログエクスプローラでもバージョン履歴を確認することができます。
Screenshot 2024-03-26 at 19.36.23.png

ベースラインモデルの構築

出力が2値であり、複数の変数間での相互関係がある可能性があることから、このタスクにはランダムフォレスト分類器が適しているように見えます。

以下のコードでは、scikit-learnを用いてシンプルな分類器を構築します。モデルの精度を追跡するためにMLflowを用い、後ほど利用するためにモデルを保存します。

この後、モデルのバージョン管理を行うのですが、デフォルトではモデルはワークスペースのモデルレジストリに登録されます。これをUnity Catalogに切り替えるためにmlflow.set_registry_uri("databricks-uc")を宣言しています。また、トレーニングで使用したテーブルを記録するためにmlflow.log_inputを使用しています。

import mlflow
import mlflow.pyfunc
import mlflow.sklearn
import numpy as np
import sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import roc_auc_score
from mlflow.models.signature import infer_signature
from mlflow.utils.environment import _mlflow_conda_env
import cloudpickle
import time

# UC配下のモデルとして登録
mlflow.set_registry_uri("databricks-uc")

# sklearnのRandomForestClassifierのpredictメソッドは、2値の分類結果(0、1)を返却します。
# 以下のコードでは、それぞれのクラスに属する確率を返却するpredict_probaを用いる、ラッパー関数SklearnModelWrapperを構築します。

class SklearnModelWrapper(mlflow.pyfunc.PythonModel):
  def __init__(self, model):
    self.model = model
    
  def predict(self, context, model_input):
    return self.model.predict_proba(model_input)[:,1]

  
# mlflow.start_runは、このモデルのパフォーマンスを追跡するための新規MLflowランを生成します。
# コンテキスト内で、使用されたパラメーターを追跡するためにmlflow.log_param、精度のようなメトリクスを追跡するために
# mlflow.log_metricを呼び出します。
with mlflow.start_run(run_name='untuned_random_forest'):
  n_estimators = 10
  model = RandomForestClassifier(n_estimators=n_estimators, random_state=np.random.RandomState(123))
  model.fit(X_train, y_train)

  # predict_probaは[prob_negative, prob_positive]を返却するので、出力を[:, 1]でスライスします。
  predictions_test = model.predict_proba(X_test)[:,1]
  auc_score = roc_auc_score(y_test, predictions_test)
  mlflow.log_param('n_estimators', n_estimators)
  # メトリックとしてROC曲線のAUCを使用します。
  mlflow.log_metric('auc', auc_score)
  wrappedModel = SklearnModelWrapper(model)
  # モデルの入出力スキーマを定義するシグネチャをモデルとともに記録します。
  # モデルがデプロイされた際に、入力を検証するためにシグネチャが用いられます。
  signature = infer_signature(X_train, wrappedModel.predict(None, X_train))

  # log_inputによるトレーニング/テストデータのロギング
  training_dataset = mlflow.data.load_delta(table_name=training_data_table, version=training_latest_version)
  test_dataset = mlflow.data.load_delta(table_name=test_data_table, version=test_latest_version)
  mlflow.log_input(training_dataset, context="training")
  mlflow.log_input(test_dataset, context="test")
  
  # MLflowにはモデルをサービングする際に用いられるconda環境を作成するユーティリティが含まれています。
  # 必要な依存関係がconda.yamlに保存され、モデルとともに記録されます。
  conda_env =  _mlflow_conda_env(
        additional_conda_deps=None,
        additional_pip_deps=["cloudpickle=={}".format(cloudpickle.__version__), "scikit-learn=={}".format(sklearn.__version__)],
        additional_conda_channels=None,
    )
  mlflow.pyfunc.log_model("random_forest_model", python_model=wrappedModel, conda_env=conda_env, signature=signature)

MLflowによってモデルが記録されます。
Screenshot 2024-03-26 at 19.39.14.png

トレーニングに使用したテーブルがDatasets usedに表示されます。
Screenshot 2024-03-26 at 19.39.47.png

データチェックとして、モデルによって出力される特徴量の重要度を確認します。

feature_importances = pd.DataFrame(model.feature_importances_, index=X_train.columns.tolist(), columns=['importance'])
feature_importances.sort_values('importance', ascending=False)

先ほどボックスプロットで見たように、品質を予測するのにアルコールと密度が重要であることがわかります。
Screenshot 2024-03-26 at 19.40.45.png

MLflowにROC曲線のAUCを記録しました。画面右のフラスコアイコンをクリックして、エクスペリメントランのサイドバーを表示します。
Screenshot 2024-03-26 at 19.42.04.png

このモデルはAUC0.89を達成しました。

ランダムな分類器のAUCは0.5となり、それよりAUCが高いほど優れていると言えます。詳細は、Receiver Operating Characteristic Curveを参照ください。

Unity Catalogにモデルを登録

Unity Catalogにモデルを登録することで、Databricksのどこからでもモデルを容易に参照できるようになります。

以下のセクションでは、どのようにプログラム上から操作をするのかを説明しますが、UIを用いてモデルを登録することもできます。"Unity Catalogでモデルのライフサイクルを管理する" (AWS|Azure)を参照ください。

まず、MLflowによって記録されたトレーニング(ラン)にアクセスします。

run_id = mlflow.search_runs(filter_string='tags.mlflow.runName = "untuned_random_forest"').iloc[0].run_id
run_id
'c84defcd6aa54f33a1978b099e761ed9'
# モデルレジストリにモデルを登録します
model_version = mlflow.register_model(f"runs:/{run_id}/random_forest_model", model_name)
Registered model 'takaakiyayoi_catalog.wine_db.wine_model' already exists. Creating a new version of this model...

Created version '7' of model 'takaakiyayoi_catalog.wine_db.wine_model'.

上のメッセージからわかるように、このモデルはバージョン7として登録されました。

カタログエクスプローラでモデルを確認できるはずです。カタログエクスプローラを表示するには、左のサイドバーでカタログアイコンをクリックします。
Screenshot 2024-03-26 at 19.44.44.png

バージョン7のモデルの依存関係タブを確認すると、トレーニングで用いられたテーブルとのリネージが表示されます。
Screenshot 2024-03-26 at 19.45.40.png
Screenshot 2024-03-26 at 19.45.54.png

次に、このモデルにエイリアスchamiponを作成し、モデルレジストリからモデルをこのノートブックにロードします。

from mlflow.tracking import MlflowClient

client = MlflowClient()

# モデル"takaakiyayoi_catalog.wine_db.wine_model"の最新バージョンに対してエイリアス"champion"を作成します
client.set_registered_model_alias(model_name, "champion", model_version.version)

先ほどはバージョン6にエイリアスchampionが付与されていましたが、今度はバージョン7に付与されています。
Screenshot 2024-03-26 at 19.46.34.png

これで、"models:/takaakiyayoi_catalog.wine_db.wine_model@champion"のパスでモデルを参照することができます。

model = mlflow.pyfunc.load_model(f"models:/{model_name}@champion")

# サニティチェック: この結果はMLflowで記録されたAUCと一致すべきです
print(f'AUC: {roc_auc_score(y_test, model.predict(X_test))}')
AUC: 0.888902759745664

新たなモデルを用いたエクスペリメント

ハイパーパラメーターチューニングを行わなくても、このランダムフォレストモデルはうまく動きました。

以下のコードでは、より精度の高いモデルをトレーニングするためにxgboostライブラリを使用します。HyperoptとSparkTrialsを用いて、複数のモデルを並列にトレーニングするために、ハイパーパラメーター探索を並列で処理します。上のコードと同様に、パラメーター設定、パフォーマンスをMLflowでトラッキングします。

from hyperopt import fmin, tpe, hp, SparkTrials, Trials, STATUS_OK
from hyperopt.pyll import scope
from math import exp
import mlflow.xgboost
import numpy as np
import xgboost as xgb

# ハイパーパラメータの探索空間
search_space = {
  'max_depth': scope.int(hp.quniform('max_depth', 4, 100, 1)),
  'learning_rate': hp.loguniform('learning_rate', -3, 0),
  'reg_alpha': hp.loguniform('reg_alpha', -5, -1),
  'reg_lambda': hp.loguniform('reg_lambda', -6, -1),
  'min_child_weight': hp.loguniform('min_child_weight', -1, 3),
  'objective': 'binary:logistic',
  'seed': 123, # トレーニングの再現性を確保するためにシードを設定します。
}

def train_model(params):
  # MLflowのオートロギングによって、ハイパーパラメーターとトレーニングしたモデルは自動的にMLflowに記録されます。
  mlflow.xgboost.autolog()
  with mlflow.start_run(nested=True):
    train = xgb.DMatrix(data=X_train, label=y_train)
    test = xgb.DMatrix(data=X_test, label=y_test)
    # xgbが評価メトリクスを追跡できるようにテストセットを渡します。XGBoostは、評価メトリクスに改善が見られなくなった際にトレーニングを中止します。
    booster = xgb.train(params=params, dtrain=train, num_boost_round=1000,\
                        evals=[(test, "test")], early_stopping_rounds=50)
    predictions_test = booster.predict(test)
    auc_score = roc_auc_score(y_test, predictions_test)
    mlflow.log_metric('auc', auc_score)

    signature = infer_signature(X_train, booster.predict(train))
    mlflow.xgboost.log_model(booster, "model", signature=signature)
    
    # fminがauc_scoreを最大化するようにlossに-1*auc_scoreを設定します。
    return {'status': STATUS_OK, 'loss': -1*auc_score, 'booster': booster.attributes()}

# 並列度が高いほどスピードを改善できますが、ハイパーパラメータの探索において最適とは言えません。
# max_evalsの平方根が並列度の妥当な値と言えます。
spark_trials = SparkTrials(parallelism=10)

# "xgboost_models"という親のランの子ランとして、それぞれのハイパーパラメーターの設定が記録されるようにMLflowランのコンテキスト内でfminを実行します。
with mlflow.start_run(run_name='xgboost_models'):

  # トレーニング/テストデータのロギング
  mlflow.log_input(training_dataset, context="training_dataset")
  mlflow.log_input(test_dataset, context="test_dataset")

  best_params = fmin(
    fn=train_model, 
    space=search_space, 
    algo=tpe.suggest, 
    max_evals=96, 
    trials=spark_trials, 
    rstate=np.random.default_rng(123)
  )

MLflowを用いて結果を確認

Experiment Runsサイドバーを開いて、ランを参照します。メニューを表示するために、下向き矢印の隣にあるDateをクリックしaucを選択し、aucメトリックの順でランを並び替えます。一番高いaucは0.92となっています。ベースラインモデルを上回りました!
Screenshot 2024-03-26 at 19.52.35.png
Screenshot 2024-03-26 at 19.53.46.png

MLflowはそれぞれのランのパフォーマンスメトリクスとパラメーターをトラッキングします。Experiment Runsサイドバーの一番上にある右上向きの矢印アイコンをクリックすることで、MLflowランの一覧に移動することができます。
Screenshot 2024-03-26 at 19.54.25.png

次に、どのようにハイパーパラメータの選択がAUCと相関しているのかを見てみましょう。"+"アイコンをクリックして、親のランを展開し、親以外の全てのランを選択し、"Compare"をクリックします。Parallel Coordinates Plotを選択します。

メトリックに対するパラメーターのインパクトを理解するために、Parallel Coordinates Plotは有用です。プロットの右上にあるピンクのスライダーをドラッグすることで、AUCの値のサブセット、対応するパラメーターの値をハイライトすることができます。以下のプロットでは、最も高いAUCの値をハイライトしています。

Screenshot 2024-03-26 at 19.56.51.png

最もパフォーマンスの良かったランの全てが、reg_lambdalearning_rateにおいて低い値を示していることに注意してください。

これらのパラメーターに対してより低い値を探索するために、さらなるハイパーパラメーターチューニングを実行することもできますが、ここではシンプルにするために、そのステップをデモに含めていません。

それぞれのハイパーパラメーターの設定において生成されたモデルを記録するためにMLflowを用いました。以下のコードでは、最も高いパフォーマンスを示したランを検索し、Unity Catalogにモデルを登録します。

best_run = mlflow.search_runs(order_by=['metrics.auc DESC']).iloc[0]
print(f'AUC of Best Run: {best_run["metrics.auc"]}')
AUC of Best Run: 0.9207577767949284

Unity Catalogにあるchampionモデルを更新

はじめに、ベースラインモデルにエイリアスchampionを付与しました。さらに精度の高いモデルができましたので、エイリアスを更新します。

new_model_version = mlflow.register_model(f"runs:/{best_run.run_id}/model", model_name)

今回のモデルはバージョン8となります。

Created version '8' of model 'takaakiyayoi_catalog.wine_db.wine_model'.

カタログエクスプローラに前のモデルバージョンが存在することを確認します。

以下のコードで新バージョンにエイリアスchampionを付与します。

client.set_registered_model_alias(model_name, "champion", new_model_version.version)

バージョン8がchampionモデルになりました。
Screenshot 2024-03-26 at 20.00.24.png

何もコードを変更しなくても、load_modelを呼び出すクライアントは新たなモデルを受け取ります。

# このコードは上の"ベースラインモデルの構築"と同じものです。新たなモデルを利用するためにクライアント側での変更は不要です!
model = mlflow.pyfunc.load_model(f"models:/{model_name}@champion")
print(f'AUC: {roc_auc_score(y_test, model.predict(X_test))}')
AUC: 0.9207577767949284

バッチ推論

新たなデータのコーパスに対してモデルを評価したいというシナリオは数多く存在します。例えば、新たなデータバッチを手に入れたり、同じデータコーパスに対して二つのモデルを比較することなどが考えられます。

以下のコードでは、並列に処理を行うためにSparkを用い、Deltaテーブルに格納されたデータに対してモデルの評価を行います。

# 新たなデータコーパスをシミュレートするために、既存のX_trainデータをDeltaテーブルに保存します。
# 実際の環境では、本当に新たなデータバッチとなります。
spark_df = spark.createDataFrame(X_train)
# Deltaテーブルの保存先
table_path = f"{work_path}/delta/wine_data"
# すでにコンテンツが存在する場合には削除します
dbutils.fs.rm(table_path, True)
spark_df.write.format("delta").save(table_path)

モデルをSparkのUDF(ユーザー定義関数)としてロードし、Deltaテーブルに適用できるようにします。

import mlflow.pyfunc

apply_model_udf = mlflow.pyfunc.spark_udf(spark, f"models:/{model_name}@champion")
# 新規データをDeltaから読み込みます
new_data = spark.read.format("delta").load(table_path)
display(new_data)

Screenshot 2024-03-26 at 20.02.57.png

from pyspark.sql.functions import struct

# 新規データにモデルを適用します
udf_inputs = struct(*(X_train.columns.tolist()))

new_data = new_data.withColumn(
  "prediction",
  apply_model_udf(udf_inputs)
)
# それぞれの行には予測結果が紐づけられています。
# xgboostの関数はデフォルトでは確率を出力せず、予測結果が[0, 1]に限定されないことに注意してください。
display(new_data)

Screenshot 2024-03-26 at 20.03.48.png

# クリーンアップ
dbutils.fs.rm(table_path, True)

モデルサービング

低レーテンシーでの予測を行うようにモデルを運用するためには、サーバレスモデルサービング(AWS|Azure)を利用して、モデルをエンドポイントにデプロイします。

以下のコードでは、どのようにREST APIを用いてデプロイしたモデルから予測結果を得るのかを説明します。

モデルのエンドポイントにリクエストするためには、Databricksのトークンが必要です。(右上のプロファイルアイコンの下の)User Settingページでトークンを生成することができます。

トークンなど機密性の高い情報はノートブックに記述すべきではありません。シークレットに保存するようにしてください。

Databricksにおけるシークレットの管理 - Qiita

シークレットスコープの作成

databricks secrets create-scope demo-token-takaaki.yayoi --profile tokyo

シークレットの作成

databricks secrets put-secret --json '{
  "scope": "demo-token-takaaki.yayoi",
  "key": "token",
  "string_value": "<secret>"
}' --profile tokyo

ローカルマシンでDatabricks CLIを用いて上を実行してシークレットを作成しておきます。

import os

# 事前にCLIでシークレットにトークンを登録しておきます
token = dbutils.secrets.get("demo-token-takaaki.yayoi", "token")

os.environ["DATABRICKS_TOKEN"] = token

カタログエクスプローラで登録されているワインの予測モデルに移動します。このモデルをサービングをクリックします。ここではサービングエンドポイント名をtaka-wine-endpointとしています。

Screenshot 2024-03-26 at 20.07.45.png
Screenshot 2024-03-26 at 20.21.09.png

次に、Query Endopointで、リクエストを送信するためのPythonコードスニペットを表示するためにPythonボタンをクリックします。コードをこのノートブックにコピーします。次のセルと同じようなコードになるはずです。
Screenshot 2024-03-26 at 20.21.45.png

import os
import requests
import numpy as np
import pandas as pd

def create_tf_serving_json(data):
  return {'inputs': {name: data[name].tolist() for name in data.keys()} if isinstance(data, dict) else data.tolist()}

def process_input(dataset):
  if isinstance(dataset, pd.DataFrame):
    return {"dataframe_split": dataset.to_dict(orient='split') }
  elif isinstance(dataset, str):
    return dataset
  else:
    return create_tf_serving_json(dataset)

def score_model(dataset):
  
  #print(dataset)
  url = '<エンドポイントのURL>'
  headers = {'Authorization': f'Bearer {os.environ.get("DATABRICKS_TOKEN")}'}
  data_json = process_input(dataset)
  
  #print(data_json)
  
  response = requests.request(method='POST', headers=headers, url=url, json=data_json)
  if response.status_code != 200:
    raise Exception(f'Request failed with status {response.status_code}, {response.text}')
  return response.json()

エンドポイントから得られるモデルの予測結果と、ローカルでモデルを評価した結果は一致すべきです。

# モデルサービングは、低レーテンシーで予測を行うように設計されています。
num_predictions = 5
served_predictions = score_model(X_test[:num_predictions])
model_evaluations = model.predict(X_test[:num_predictions])

# トレーニングしたモデルとデプロイされたモデルの結果を比較します。
df1 = pd.DataFrame(model_evaluations)
df2 = pd.DataFrame(served_predictions)

df1.rename(columns={0: "Model Prediction"}, inplace=True)
df2.rename(columns={"predictions": "Served Model Prediction"}, inplace=True)

pd.concat([df1, df2], axis=1)

Screenshot 2024-03-26 at 20.22.22.png

GUIとの連携

このようにREST APIエンドポイントを立ち上げることで、GUIと連携させることができます。

ここではstreamlitを使ってGUIを構築しています。以下は別ファイルstreamlit.pyとして、ノートブックと同じパスに格納しておきます。

streamlit.py
import streamlit as st 
import numpy as np 
from PIL import Image
import base64
import io

import os
import requests
import numpy as np
import pandas as pd

import json

from databricks.sdk.runtime import dbutils

#st.title('ワイン品質予測モデル')
st.header('ワイン品質予測モデル')
st.write("![](https://sajpstorage.blob.core.windows.net/demo20210903-ml/22243068_s.jpg)")
st.write('[Databricksにおける機械学習モデル構築のエンドツーエンドのサンプル \- Qiita](https://qiita.com/taka_yayoi/items/f48ccd35e0452611d81b)') # markdown

# Copy and paste this code from the MLflow real-time inference UI. Make sure to save Bearer token from 
def create_tf_serving_json(data):
  return {'inputs': {name: data[name].tolist() for name in data.keys()} if isinstance(data, dict) else data.tolist()}

def process_input(dataset):
  if isinstance(dataset, pd.DataFrame):
    return {"dataframe_split": dataset.to_dict(orient='split') }
  elif isinstance(dataset, str):
    return dataset
  else:
    return create_tf_serving_json(dataset)

def score_model(dataset):
  # 1. パーソナルアクセストークンを設定してください
  token = dbutils.secrets.get("demo-token-takaaki.yayoi", "token")

  # 2. モデルエンドポイントのURLを設定してください
  url = '<エンドポイントのURL>'
  headers = {'Authorization': f'Bearer {token}'}
  #st.write(token)
  
  data_json = process_input(dataset)
   
  response = requests.request(method='POST', headers=headers, url=url, json=data_json)
  if response.status_code != 200:
    raise Exception(f'Request failed with status {response.status_code}, {response.text}')
  return response.json()

st.subheader('品質を予測したいワインの情報')

with st.sidebar:
  fixed_acidity = st.slider("固定酸性度", 0.0, 20.0, 5.6, 0.1, help="固定酸度ワインに含まれるほとんどの酸は不揮発性(すぐに蒸発しない)。")
  volatile_acidity = st.slider("揮発性酸性度", 0.0, 2.0, 0.66, 0.01, help="揮発性の酸度ワイン中の酢酸の量は、多すぎると不快な酢の味になる可能性があります。")
  citric_acid = st.slider("クエン酸", 1.0, 2.0, 0.0, 0.1, help="クエン酸が少量の場合、ワインに「新鮮さ」と「風味」を加えることができます。")
  residual_sugar = st.slider("残留糖", 1.0, 70.0, 2.2, 0.1, help="残留糖発酵が停止した後に残っている砂糖の量については、1グラム/リットル未満のワインと45グラム/リットルを超えるワインが甘いとみなされるのは稀です。")
  chlorides = st.slider("塩化物", 0.0, 1.0, 0.087, 0.001, help="")
  free_sulfur_dioxide = st.slider("遊離二酸化硫黄", 0, 300, 3, help="微生物の成長とワインの酸化を防ぎます")
  total_sulfur_dioxide = st.slider("総二酸化硫黄", 0, 500, 11, help="低濃度では、二酸化硫黄はワインではほとんど検出されませんが、50 ppmを超える遊離二酸化硫黄濃度では、ワインの香りや味に二酸化硫黄がはっきりと現れます。")
  density = st.slider("密度(%)", 0.0, 1.5, 0.993, 0.001, help="水の密度は、アルコールと糖分の割合によって、水の密度に近くなります。")
  pH = st.slider("pH", 0.0, 5.0, 3.71, 0.01, help="ほとんどのワインはpHが3-4の間です。")
  sulphates = st.slider("硫酸塩", 0.0, 2.0, 0.63, 0.01, help="抗菌剤および酸化防止剤として寄与することができるワイン添加物")
  alcohol = st.slider("アルコール度数(%)", 0.0, 15.0, 12.8, 0.1, help="ワインのアルコール含有量")

  is_red = st.radio("", ("赤ワイン", "白ワイン"), horizontal=True, args=[1, 0])

if is_red == "赤ワイン":
  is_red = 1
else:
  is_red = 0

#st.write(is_red)

parameter_df = pd.DataFrame(
    data={'fixed_acidity': fixed_acidity, 
          'volatile_acidity': volatile_acidity, 
          'citric_acid': citric_acid,
          'residual_sugar': residual_sugar,
          'chlorides': chlorides,
          'free_sulfur_dioxide': free_sulfur_dioxide,
          'total_sulfur_dioxide': total_sulfur_dioxide,
          'density': density,
          'pH': pH,
          'sulphates': sulphates,
          'alcohol': alcohol,
          'is_red': is_red},index=[0]
)

st.write(parameter_df)

response = score_model(parameter_df)
response_df = pd.DataFrame(response) 
#st.write(response)
parameter_df['predictions'] = response_df
#st.write(parameter_df)

st.subheader('モデルの予測結果')

probability = int(parameter_df['predictions'][0] * 100)
  
#st.write(f"このワインが高品質である確率は<font color='red'><b>{probability}%</b></font>です。", unsafe_allow_html=True)
st.metric(label="このワインが高品質である確率", value=f"{probability}%")

元のノートブックで以下を実行します。

from dbruntime.databricks_repl_context import get_context

def front_url(port):
    """
    フロントエンドを実行するための URL を返す

    Returns
    -------
    proxy_url : str
        フロントエンドのURL
    """
    ctx = get_context()
    proxy_url = f"https://{ctx.browserHostName}/driver-proxy/o/{ctx.workspaceId}/{ctx.clusterId}/{port}/"

    return proxy_url

PORT = 1501

# Driver ProxyのURLを表示
print(front_url(PORT))

# 利便性のためにリンクをHTML表示
displayHTML(f"<a href='{front_url(PORT)}' target='_blank' rel='noopener noreferrer'>別ウインドウで開く</a>")
https://xxx.databricks.com/driver-proxy/o/5099015744649857/0326-083932-5fyuqard/1501/

Screenshot 2024-03-26 at 20.25.39.png

こちらを実行した後で、上のリンクにアクセスします。

streamlit_file = "/Workspace/Users/takaaki.yayoi@databricks.com/20240325_uc_ml/streamlit.py"
!streamlit run {streamlit_file} --server.port {PORT}

Screenshot 2024-03-26 at 20.26.38.png

左のスライダーを動かすと、モデルサービングエンドポイントのモデルに問い合わせを行い、モデルの品質を予測します。
Screenshot 2024-03-26 at 20.27.31.png

以上で終了です。お疲れ様でした!

はじめてのDatabricks

はじめてのDatabricks

Databricks無料トライアル

Databricks無料トライアル

4
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
4