こちらのソリューションアクセラレータをウォークスルーします。
準備
こちらのDownload all notebooksをクリックして、必要情報を入力するとinsurance-qa-nlp.zipをダウンロードできます。こちらをワークスペースにインポートします。
config/notebook-config
でデータベースやパスを指定できます。私は以下のように変更しています。
config = {}
config["model_name"] = "insuranceqa_taka"
config["database_name"] = "insuranceqa_taka"
config["model_output_dir"] = "/tmp/insuranceqa_model"
config["pipeline_output_dir"] = "/tmp/insuranceqa_pipeline/artifacts"
config["model_artifact_path"] = "model"
config["main_path"] = "dbfs:/tmp/insurance_taka"
config["main_path_w_dbfs"] = "/dbfs/tmp/insurance_taka"
config["main_local_path"] = "file:/tmp/insurance"
このソリューションアクセラレータは、テーブルをHiveメタストアhive_metastore
配下に作成します。
ノートブックRUNME
を実行します。すると、二つのクラスターとジョブが作成され、ジョブが実行されます。
- ins_qa_cluster: データ処理などで用いるCPUクラスター
- ins_qa_cluster_train: トレーニングで用いるGPUクラスター
以下では、実行されるそれぞれのノートブックを説明していきます。
01_explore
全体像の説明とトレーニングデータの準備を行います。
使用するクラスター: ins_qa_cluster
概要
COVID-19のパンデミック以降、さまざまな業界の企業はデジタル導入を加速していますが、保険会社は顧客からの変化し続ける需要に対応できているのでしょうか?
保険会社として、あなたはビジネス価値の創出に時間とリソースの大部分を投入できているのでしょうか?
保険ビジネスにおいてカスタマーサービスは重要なパーツとなっています。これは、複数のビジネスケースでも当てはまります: マーケティングから顧客の維持、クレームなどです。
また、カスタマーサービスチームのトレーニングには時間と労力を要するにもかかわらず、他の部門と比べてカスタマーサービスチームの入れ替わりは非常に激しいものとなっています。さらに、保険会社は多くの場合、カスタマーサービスを他の企業にアウトソースしているという事実は、サービス品質と一貫性の観点での課題を表しています。
これらのプロセスをデジタル化することで、保険会社は以下のことをシームレスに行えるようになります:
- 待ち時間を削減することで顧客満足度を向上
- 電話回数の量を削減することで、優れたインタラクティブ体験を提供
- 電話代コストを削減
- 少ないスタッフで多くのことを行えるようにすることで、オペレーションをスケール
- 予算と人的リソースを、オペレーション的なプロセスから、実際の製品や価値創造にシフト
このソリューションアクセラレータでは、Interactive Voice Response (IVR)のストリームや、モバイルアプリ、SMS、Whatsappやその他のコミュニケーションチャネルと連携可能なバーチャルアシスタントからの非構造化テキストのピースに基づいて、顧客の意図を検知する機械学習ソリューションの開発とデプロイのためのクイックスタートとなっています。
ターゲットとするソリューション
使い始める
- このアクセラレータの目的に合わせて、Insurance QA Datasetを活用します。
- このノートブックでは、このデータセットをダウンロードして、train、test、validationセットの複数のDeltaテーブルに取り込みます。
- 次のノートブック(02_explore_clean.py)では、ちょっとしたデータ探索とクリーニングを行います。
- 次に、我々のデータを用いてNLPモデル(distilbert-en)をファインチューニングします。
- ファインチューニングしたモデルをラッピングするためのPandas UDFを作成します。これによって、静的/バッチデータソースやストリーミングデータソースの両方に対して予測結果を生成できるようになります。
- 最後に、Databricksモデルサービングを活用することで、リアルタイム予測エンドポイントとしてモデルをデプロイします。
%pip install datasets
%run ./config/notebook-config
Insurance QA Datasetのダウンロード
from datasets import load_dataset
dataset = load_dataset("j0selit0/insurance-qa-en")
dataset
DatasetDict({
train: Dataset({
features: ['index', 'topic_en', 'question_en'],
num_rows: 12888
})
valid: Dataset({
features: ['index', 'topic_en', 'question_en'],
num_rows: 1999
})
test: Dataset({
features: ['index', 'topic_en', 'question_en'],
num_rows: 1999
})
})
データセットの確認
- データセットがダウンロードできたので、コンテンツを見てみましょう。
- 上のセルからは、主に興味の対象となる2つの列があることがわかります: question_en と topic_en です。最初の列には、複数の保険トピックに関係する質問が含まれており、2つ目の列にはそれぞれの質問に対する分類/トピックが格納されています。
- 先に進めて、トレーニングセットから同じデータサンプルを見てみましょう。Hugging Face Datasetsにはカスタムのフォーマット(DatasetDictionary)がありますが、幸運なことに、より直感的な方法でデータのコンテンツを探索できるようにしてくれる to_pandas() のようないくつか非常に使いやすい関数が存在しています。
display(dataset["train"].to_pandas().loc[:10, ["question_en", "topic_en"]])
基本的なクリーニング
# 全てを小文字に変換し、余計なスペースを除外しましょう
import re
from datasets import ClassLabel
def clean(example: str) -> str:
output = []
for question in example["question_en"]:
question_clean = question.lower()
question_clean = re.sub(' {2,}', ' ', question_clean)
output.append(question_clean)
example["question_en"] = output
return example
clean_dataset = dataset.map(lambda example: clean(example), batched = True)
# 列名を変更し、labels を ClassLabel に変換します
clean_dataset = clean_dataset.remove_columns(["index"])
clean_dataset = clean_dataset.rename_columns({"question_en": "text", "topic_en": "label"})
names = list(set(clean_dataset["train"]["label"]))
clean_dataset = clean_dataset.cast_column("label", ClassLabel(names = names))
# 以降のトレーニングのためにクリーニングしたデータセットを保存します
## pyarrowは直接DBFSに書き込めないので、クリーンなデータセットをはじめにドライバーのローカルに保存してから、DBFSにコピーします
local_path = "/tmp/insuranceqa"
dbutils.fs.rm(config["main_path"], True)
clean_dataset.save_to_disk(local_path)
dbutils.fs.cp(f"file:///{local_path}", config["main_path"], recurse = True)
データプロファイルの生成
# display関数を用いることで、データセットのデータプロファイルを容易に作成できます
display(dataset["train"].to_pandas())
- 上のプロファイルからは、life insuranceに関連する質問が非常に頻繁にあることに気づきます。保険会社としては、我々の聴衆(マーケティング、セールス、教育会社など)で、この側面を活用するためのアクションを取ることに興味を持つかもしれません。一方、我々のカスタマーサービスチームを強化、スケールさせる必要があることも示しています。より多くの洞察を導出するために、より広範なトピック/意図の分布を見ていきます。
- これを行うためには、以下のセルを実行し、シンプルに"Table"タブの隣のプラスアイコンをクリックします。ここでは、X軸に topic_en カラムを含め、Y軸に index カラムの COUNT を含めた棒グラフを作成することができます。
- トレーニングセットにおけるトピック/意図の分布を見ることで、生命保険に加えて、自動車保険やメディケアが非常に共通したテーマであることがわかります。
- 反対の方向に目を向け、あまり共通していない意図を見てみると、critical illness insurance 、 long term care insurance 、 other insurance をハイライトすることができます。ここでも、我々はこれら特定のビジネスラインの理解に興味を持ち、それらの利益マージンを比較したいと考えるかもしれません。これらのトピックに関する質問が少ないという事実は、お客様が人間のエージェントと会話することなしに、デジタルチャネルを通じて彼らの問題を解決したり、質問に回答することで、彼らが優れた仕事をしていることを意味しているのかもしれません。
ここで、HuggingFaceデータセットをPandasに変換し、Deltaテーブルに書き込むことでテーブルを作成します。データセットはお使いのドライバーのメモリーより大きくすることができないため、スケーラビリティの制約に直面する場合があります。
より現実的なシナリオにおいては、あなたのデータをSparkが並列処理できるようなフォーマットで提供しても構いません。
推論のためにDeltaにテストセットを保存
test_df = spark.createDataFrame(dataset["test"].to_pandas())
test_df.write.saveAsTable("questions", mode = "overwrite")
次のステップ
- データセットを入手し、十分に探索したので、モデルのトレーニングに進みます。
- 我々のモデルは12の異なるトピックに関する顧客の質問の分類に活用されます。
- トレーニングをスタートするには、リポジトリの
02_train
ノートブックを参照してください。
02_train
モデルのトレーニングを行います。
使用するクラスター: ins_qa_cluster_train
%pip install -q datasets
dbutils.library.restartPython()
%run ./config/notebook-config
Distilbertの例
- 以下のセルでは、動作原理を確認するためにシンプルにHugging Faceのテキスト分類パイプラインのインスタンスを作成しています。
- 先に進めて、いくつかのテキストに対してサンプルの予測を実行します。
from transformers import pipeline
pipe = pipeline("text-classification")
pipe(["This restaurant is awesome", "This restaurant is awful"])
[{'label': 'POSITIVE', 'score': 0.9998743534088135},
{'label': 'NEGATIVE', 'score': 0.9996669292449951}]
- Hugging Faceが受け入れるフォーマットでデータセットを作成する必要があります。
- 我々のデータがどのようにエンコード、トークナイズされるのかを定義する必要があります。
- 我々のモデルには異なる12のラベルがあります。この部分をカスタマイズするために、Hugging Faceの
AutoModelForSequenceClassification
クラスを活用します。
データセットの読み込み
import datasets
local_path = "/tmp/insurance"
dbutils.fs.rm(f"file://{local_path}", recurse = True)
dbutils.fs.cp(config["main_path"], f"file://{local_path}", recurse = True)
dataset = datasets.load_from_disk(local_path)
データセットのトークナイズ
from transformers import AutoTokenizer
base_model = "distilbert-base-uncased"
tokenizer = AutoTokenizer.from_pretrained(base_model)
def tokenize(examples):
return tokenizer(examples["text"], padding = True, truncation = True, return_tensors = "pt")
tokenized_dataset = dataset.map(tokenize, batched = True)
モデルのトレーニング
ここでは以下を行います:
-
Trainer
オブジェクトの作成 - これはトレーニングを容易にするHugging Faceのヘルパークラスです -
TrainingArguments
オブジェクトのインスタンス作成 -
EarlyStoppingCallback
の作成 - これはモデルの過学習の回避に役立ちます - モデルのトレーニング
from transformers import AutoModelForSequenceClassification
import datasets
dataset = datasets.load_from_disk(config["main_path_w_dbfs"])
label2id = dataset["train"].features["label"]._str2int
id2label = dataset["train"].features["label"]._int2str
model = AutoModelForSequenceClassification.from_pretrained(
base_model,
num_labels = len(label2id),
label2id = label2id,
id2label = dict(enumerate(id2label))
)
DistilBERTアーキテクチャの確認
model
modelの中身
``` DistilBertForSequenceClassification( (distilbert): DistilBertModel( (embeddings): Embeddings( (word_embeddings): Embedding(30522, 768, padding_idx=0) (position_embeddings): Embedding(512, 768) (LayerNorm): LayerNorm((768,), eps=1e-12, elementwise_affine=True) (dropout): Dropout(p=0.1, inplace=False) ) (transformer): Transformer( (layer): ModuleList( (0): TransformerBlock( (attention): MultiHeadSelfAttention( (dropout): Dropout(p=0.1, inplace=False) (q_lin): Linear(in_features=768, out_features=768, bias=True) (k_lin): Linear(in_features=768, out_features=768, bias=True) (v_lin): Linear(in_features=768, out_features=768, bias=True) (out_lin): Linear(in_features=768, out_features=768, bias=True) ) (sa_layer_norm): LayerNorm((768,), eps=1e-12, elementwise_affine=True) (ffn): FFN( (dropout): Dropout(p=0.1, inplace=False) (lin1): Linear(in_features=768, out_features=3072, bias=True) (lin2): Linear(in_features=3072, out_features=768, bias=True) (activation): GELUActivation() ) (output_layer_norm): LayerNorm((768,), eps=1e-12, elementwise_affine=True) ) (1): TransformerBlock( (attention): MultiHeadSelfAttention( (dropout): Dropout(p=0.1, inplace=False) (q_lin): Linear(in_features=768, out_features=768, bias=True) (k_lin): Linear(in_features=768, out_features=768, bias=True) (v_lin): Linear(in_features=768, out_features=768, bias=True) (out_lin): Linear(in_features=768, out_features=768, bias=True) ) (sa_layer_norm): LayerNorm((768,), eps=1e-12, elementwise_affine=True) (ffn): FFN( (dropout): Dropout(p=0.1, inplace=False) (lin1): Linear(in_features=768, out_features=3072, bias=True) (lin2): Linear(in_features=3072, out_features=768, bias=True) (activation): GELUActivation() ) (output_layer_norm): LayerNorm((768,), eps=1e-12, elementwise_affine=True) ) (2): TransformerBlock( (attention): MultiHeadSelfAttention( (dropout): Dropout(p=0.1, inplace=False) (q_lin): Linear(in_features=768, out_features=768, bias=True) (k_lin): Linear(in_features=768, out_features=768, bias=True) (v_lin): Linear(in_features=768, out_features=768, bias=True) (out_lin): Linear(in_features=768, out_features=768, bias=True) ) (sa_layer_norm): LayerNorm((768,), eps=1e-12, elementwise_affine=True) (ffn): FFN( (dropout): Dropout(p=0.1, inplace=False) (lin1): Linear(in_features=768, out_features=3072, bias=True) (lin2): Linear(in_features=3072, out_features=768, bias=True) (activation): GELUActivation() ) (output_layer_norm): LayerNorm((768,), eps=1e-12, elementwise_affine=True) ) (3): TransformerBlock( (attention): MultiHeadSelfAttention( (dropout): Dropout(p=0.1, inplace=False) (q_lin): Linear(in_features=768, out_features=768, bias=True) (k_lin): Linear(in_features=768, out_features=768, bias=True) (v_lin): Linear(in_features=768, out_features=768, bias=True) (out_lin): Linear(in_features=768, out_features=768, bias=True) ) (sa_layer_norm): LayerNorm((768,), eps=1e-12, elementwise_affine=True) (ffn): FFN( (dropout): Dropout(p=0.1, inplace=False) (lin1): Linear(in_features=768, out_features=3072, bias=True) (lin2): Linear(in_features=3072, out_features=768, bias=True) (activation): GELUActivation() ) (output_layer_norm): LayerNorm((768,), eps=1e-12, elementwise_affine=True) ) (4): TransformerBlock( (attention): MultiHeadSelfAttention( (dropout): Dropout(p=0.1, inplace=False) (q_lin): Linear(in_features=768, out_features=768, bias=True) (k_lin): Linear(in_features=768, out_features=768, bias=True) (v_lin): Linear(in_features=768, out_features=768, bias=True) (out_lin): Linear(in_features=768, out_features=768, bias=True) ) (sa_layer_norm): LayerNorm((768,), eps=1e-12, elementwise_affine=True) (ffn): FFN( (dropout): Dropout(p=0.1, inplace=False) (lin1): Linear(in_features=768, out_features=3072, bias=True) (lin2): Linear(in_features=3072, out_features=768, bias=True) (activation): GELUActivation() ) (output_layer_norm): LayerNorm((768,), eps=1e-12, elementwise_affine=True) ) (5): TransformerBlock( (attention): MultiHeadSelfAttention( (dropout): Dropout(p=0.1, inplace=False) (q_lin): Linear(in_features=768, out_features=768, bias=True) (k_lin): Linear(in_features=768, out_features=768, bias=True) (v_lin): Linear(in_features=768, out_features=768, bias=True) (out_lin): Linear(in_features=768, out_features=768, bias=True) ) (sa_layer_norm): LayerNorm((768,), eps=1e-12, elementwise_affine=True) (ffn): FFN( (dropout): Dropout(p=0.1, inplace=False) (lin1): Linear(in_features=768, out_features=3072, bias=True) (lin2): Linear(in_features=3072, out_features=768, bias=True) (activation): GELUActivation() ) (output_layer_norm): LayerNorm((768,), eps=1e-12, elementwise_affine=True) ) ) ) ) (pre_classifier): Linear(in_features=768, out_features=768, bias=True) (classifier): Linear(in_features=768, out_features=12, bias=True) (dropout): Dropout(p=0.2, inplace=False) ) ```クラスの重みの定義
- 前のノートブックでは、意図の分布に偏りがあることを確認しており、多くが
life-insurance
に関する質問となっています。 - これは、モデルの汎化能力において課題となります。
- トレーニングセットにおける頻度に応じて、特定の意図やトピックに対する間違った予測に罰則を加えるようにするために、クラスごとに異なる重みを定義することができます。
- これを行うために、データセットの12の意図に対するクラスの重みを計算します。頻度が少ない意図ほど、間違った予測を生成した際の罰則が重くなります。
from collections import Counter
def get_class_weights(labels):
counter = Counter(labels)
item_count_dict = dict(counter.items())
size = len(labels)
weights = list({k: (size / v) for k, v in sorted(item_count_dict.items())}.values())
return weights
weights = get_class_weights(dataset["train"]["label"])
import mlflow
import itertools
import torch
from transformers import (
TrainingArguments,
Trainer,
DataCollatorWithPadding,
EarlyStoppingCallback,
ProgressCallback
)
# ここで、データセットにおける様々なサンプルを組み合わせ、同じ長さになるようにパディングを行うDataCollatorを定義します
data_collator = DataCollatorWithPadding(tokenizer)
# パフォーマンス指標が改善しなくなったらトレーニングを停止するように、EarlyStoppingCallbackを使います
early_stopping = EarlyStoppingCallback(
early_stopping_patience = 3,
early_stopping_threshold = 0.01
)
device = "cuda:0" if torch.cuda.is_available() else "cpu"
model.to(torch.device(device))
training_args = TrainingArguments(
output_dir = "/tmp/insurance_qa",
evaluation_strategy = "steps",
eval_steps = 100,
num_train_epochs = 1, # 性能を改善する際には増やします
per_device_train_batch_size = 128,
per_device_eval_batch_size = 128,
load_best_model_at_end = True,
learning_rate = 2e-5,
weight_decay = 0.01,
xpu_backend = "ccl",
no_cuda = False if torch.cuda.is_available() else True
)
class CustomTrainer(Trainer):
def __init__(self, **kwargs):
super().__init__(**kwargs)
def compute_loss(self, model, inputs, return_outputs=False):
labels = inputs.get("labels")
# フォワードパス
outputs = model(**inputs)
logits = outputs.get('logits')
# カスタムロスの計算
device = 0 if torch.cuda.is_available() else -1
weights_tensor = torch.tensor(weights).to(device)
loss_fct = torch.nn.CrossEntropyLoss(weight = weights_tensor)
loss = loss_fct(logits.view(-1, self.model.config.num_labels), labels.view(-1))
return (loss, outputs) if return_outputs else loss
trainer = CustomTrainer(
model = model,
args = training_args,
train_dataset = tokenized_dataset["train"],
eval_dataset = tokenized_dataset["valid"],
data_collator = data_collator,
callbacks = [early_stopping, ProgressCallback()]
)
モデルのトレーニングとアーティファクトの保存
from transformers import pipeline
import logging
class InsuranceQAModel(mlflow.pyfunc.PythonModel):
def load_context(self, context):
device = 0 if torch.cuda.is_available() else -1
pipeline_path = context.artifacts["insuranceqa_pipeline"]
model_path = context.artifacts["base_model"]
self.pipeline = pipeline(
"text-classification",
model = context.artifacts["insuranceqa_pipeline"],
config = context.artifacts["base_model"],
device = device
)
def predict(self, context, model_input):
try:
logging.info(f"Model input: {model_input}")
questions = list(model_input)
results = self.pipeline(questions, truncation = True, batch_size = 8)
labels = [result["label"] for result in results]
logging.info(f"Model output: {labels}")
return labels
except Exception as exception:
logging.error(f"Model input: {questions}, type: {str(type(questions))}")
return {"error": str(exception)}
from transformers import pipeline
import transformers
import numpy as np
model_output_dir = "/tmp/insuranceqa_model"
pipeline_output_dir = "/tmp/insuranceqa_pipeline/artifacts"
model_artifact_path = "model"
#mlflow.set_experiment(experiment_name = "/Shared/insuranceqa_distilbert")
with mlflow.start_run() as run:
trainer.train()
trainer.save_model(model_output_dir)
pipe = pipeline(
"text-classification",
model = model,
config = model.config,
batch_size = 8,
tokenizer = tokenizer
)
pipe.save_pretrained(pipeline_output_dir)
# カスタム PyFunc モデルの記録
mlflow.pyfunc.log_model(
artifacts = {
"insuranceqa_pipeline": pipeline_output_dir,
"base_model": model_output_dir
},
artifact_path = model_artifact_path,
python_model = InsuranceQAModel(),
pip_requirements = [
f"""torch=={torch.__version__.split("+")[0]}""",
f"""transformers=={transformers.__version__}"""
]
)
初期の分析
- 1エポックのみでテキスト分類モデルをトレーニングしているので、最適ではないパフォーマンスであることを理解する必要があります。
- フォローアップのアクションにおいては、エポック数を歩訳すことができます - これによって、データセットにおける様々な部分からの洞察をモデルに提供するので、様々な意図も提供することになりますので、汎化性能が改善されます。
最もパフォーマンスの高いランを取得してモデルを登録
runs = mlflow.search_runs(
order_by = ["start_time DESC"],
filter_string = "attributes.status = 'FINISHED'"
)
target_run_id = runs.loc[0, "run_id"]
logged_model_uri = f"runs:/{target_run_id}/model"
loaded_model = mlflow.pyfunc.load_model(logged_model_uri)
loaded_model
mlflow.pyfunc.loaded_model:
artifact_path: model
flavor: mlflow.pyfunc.model
run_id: c9c32dd970164b528c44589036a5efc3
予測テストの実行
loaded_model.predict(["my car broke, what should I do?"])
自動車保険と返ってきます。
['auto-insurance']
モデルの登録
モデルレジストリにモデルを登録します。
ここでは、ワークスペースモデルレジストリにモデルを登録しています。
from mlflow.tracking import MlflowClient
import pandas as pd
from typing import List
client = MlflowClient()
# モデルの登録
model_details = mlflow.register_model(logged_model_uri, config["model_name"])
# シンプルなテスト
model_uri = f"""models:/{config["model_name"]}/latest"""
pipeline = mlflow.pyfunc.load_model(model_uri = model_uri)
simple_df = pd.DataFrame(["hi I crashed my car"], columns = ["question_en"])
test_prediction = pipeline.predict(simple_df.question_en)
print(f"Prediction: {test_prediction}")
if test_prediction is not None:
# レジストリのモデルを "Production" ステージに移行
client.transition_model_version_stage(
name = config["model_name"],
version = model_details.version,
stage="Production",
archive_existing_versions=True
)
03_inference
推論を実行します。
使用するクラスター: ins_qa_cluster
%pip install -q datasets
dbutils.library.restartPython()
セットアップ
%run ./config/notebook-config
推論する質問の読み込み
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType
# 質問の読み込み
test_df = spark.sql("select question_en, topic_en from questions")
# 少なくともクラスターのワーカーのコア数まで並列度を増加 - データボリュームが多いほど、この値をワーカーのコア数の倍数に設定します
sc = spark._jsc.sc()
worker_count = max(1, len([executor.host() for executor in sc.statusTracker().getExecutorInfos() ]) - 1)
total_worker_cores = spark.sparkContext.defaultParallelism * worker_count
test_df = test_df.repartition(total_worker_cores)
モデルのロードとテスト予測の生成
from typing import List
import pandas as pd
from pyspark.sql import functions as F
from mlflow.tracking import MlflowClient
import mlflow
# モデルをロードし、UDFでラッピング
pipeline = mlflow.pyfunc.load_model(f"models:/{config['model_name']}/production")
def predict(questions: pd.Series) -> pd.Series:
"""前のステップで作成したパイプラインに対するラッパー関数です。"""
result = pipeline.predict(questions.to_list())
return pd.Series(result)
predict_udf = F.pandas_udf(predict, returnType = StringType())
# UDFを用いた推論の実施
test_df = (
test_df
.withColumn("predicted", predict_udf(F.col("question_en")))
)
予測結果をDeltaに保存
(
test_df
.write
.saveAsTable(
"predictions",
mode = "overwrite",
mergeSchema = True
)
)
predictions = spark.sql("select * from predictions")
display(predictions)
パフォーマンス指標の計算
# 正しい予測結果の数を計算し、予測結果合計数で割ります
import seaborn as sns
from matplotlib import pyplot as plt
%matplotlib inline
%config InlineBackend.figure_format='retina'
pred_df = predictions.toPandas()
pred_df["hit"] = pd.to_numeric(pred_df["topic_en"] == pred_df["predicted"])
accuracy_per_intent = pred_df.groupby("topic_en").hit.mean().reset_index()
fig, ax = plt.subplots(nrows = 1, ncols = 1, figsize = (4,4))
plt.xticks(rotation = 45)
sns.barplot(
y = accuracy_per_intent["topic_en"],
x = accuracy_per_intent["hit"],
palette = "mako",
ax = ax
)
plt.title("Prediction Accuracy per Intent")
04_deploy
登録したモデルをモデルサービングエンドポイントにデプロイします。
使用するクラスター: ins_qa_cluster
Databricksモデルサービングを用いたリアルタイムモデルのデプロイ
- このステップでは、トレーニングしたモデルをデプロイし、リアルタイムモデルサービングエンドポイントして登録します。
- 現実世界のシナリオにおいては、これによって、REST APIとしてモデルを公開することができ、IVRのルーティングやチャットボットようなリアルタイムのユースケースを実現できるようになります。
%pip install -q datasets
dbutils.library.restartPython()
セットアップ
%run ./config/notebook-config
import mlflow
from mlflow.tracking import MlflowClient
import requests
import json
client = MlflowClient()
model_name = config["model_name"]
endpoint_name = f"{model_name}_v2"
model_info = client.get_latest_versions(name = model_name, stages = ["Production"])[0]
databricks_url = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiUrl().getOrElse(None)
databricks_token = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().getOrElse(None)
api_endpoint = "api/2.0/serving-endpoints"
headers = {
'Authorization': 'Bearer {}'.format(databricks_token)
}
def get_endpoint(endpoint_name: str) -> bool:
payload = {"name": endpoint_name}
response = requests.get(
f"{databricks_url}/{api_endpoint}",
data = json.dumps(payload),
headers = headers
).json()
endpoint_exists = endpoint_name in [endpoint["name"] for endpoint in response["endpoints"]]
return endpoint_exists
def create_endpoint(endpoint_name: str, model_name: str, model_version: int) -> dict:
payload = {
"name": endpoint_name,
"config": {
"served_models": [{
"model_name": model_name,
"model_version": f"{model_info.version}",
"workload_size": "Small",
"scale_to_zero_enabled": "true"
}]
}
}
if not get_endpoint(endpoint_name):
print("Endpoint doesn't exist, creating...")
response = requests.post(
f"{databricks_url}/{api_endpoint}",
data = json.dumps(payload),
headers = headers
).json()
else:
print("Endpoint exists, updating...")
update_payload = {}
update_payload["served_models"] = payload["config"]["served_models"]
response = requests.put(
f"{databricks_url}/{api_endpoint}/{endpoint_name}/config",
data = json.dumps(update_payload),
headers = headers
).json()
return response
create_endpoint(
endpoint_name = endpoint_name,
model_name = model_info.name,
model_version = model_info.version
)
モデルエンドポイントのデプロイメントステータスのチェック
import time
from IPython.display import clear_output
def check_endpoint_status(endpoint_name: str, max_retries: int = 1000, interval: int = 5) -> str:
"""Check the Model Serving deployment status at every time step defined with the interval parameters"""
current_tries = 0
while current_tries < max_retries:
clear_output(wait = True)
response_json = requests.get(
f"{databricks_url}/{api_endpoint}",
headers = headers
).json()
endpoint_status = [
endpoint for endpoint
in response_json["endpoints"]
if endpoint["name"] == endpoint_name
][0]
current_state = endpoint_status["state"]["config_update"]
if (current_state == "IN_PROGRESS"):
print(f"Checking model deployment status, attempt {current_tries} of {max_retries} - current state: {current_state}")
else:
message = f"Model endpoint deployment result: {endpoint_status}"
return message
current_tries += 1
time.sleep(interval)
check_endpoint_status(endpoint_name = endpoint_name)
しばらく待つと、エンドポイントが稼働します。
"Model endpoint deployment result: {'name': 'insuranceqa_taka_v2', 'creator': 'takaaki.yayoi@databricks.com', 'creation_timestamp': 1721707130000, 'last_updated_timestamp': 1721707130000, 'state': {'ready': 'READY', 'config_update': 'NOT_UPDATING'}, 'config': {'served_models': [{'name': 'insuranceqa_taka-1', 'model_name': 'insuranceqa_taka', 'model_version': '1'}], 'served_entities': [{'name': 'insuranceqa_taka-1', 'entity_name': 'insuranceqa_taka', 'entity_version': '1'}]}, 'id': '3e7622f4ca9f47c7bf7f42212f34eb0f', 'route_optimized': False, 'permission_level': 'CAN_MANAGE'}"
REST APIを通じたエンドポイントへのクエリー
def test_prediction_endpoint(questions):
endpoint_url = f"serving-endpoints/{endpoint_name}/invocations"
payload = {"instances": questions}
data_json = json.dumps(payload)
print(data_json)
headers["Content-Type"] = 'application/json'
response = requests.request(
method='POST',
headers = headers,
url = f"{databricks_url}/{endpoint_url}",
data = data_json
)
if response.status_code != 200:
raise Exception(f'Request failed with status {response.status_code}, {response.text}')
return response.json()
test_questions = [
"my car broke, what should I do?",
"what is my life insurance coverage?",
"can you send me my health insurance cover?"
]
test_prediction_endpoint(test_questions)
質問のそれぞれに対して適切な分類が行われていることがわかります。
{"instances": ["my car broke, what should I do?", "what is my life insurance coverage?", "can you send me my health insurance cover?"]}
{'predictions': ['auto-insurance', 'life-insurance', 'health-insurance']}