0
0

DatabricksとHugging Faceを用いた保険Q&Aにおける意図分類

Posted at

こちらのソリューションアクセラレータをウォークスルーします。

準備

こちらDownload all notebooksをクリックして、必要情報を入力するとinsurance-qa-nlp.zipをダウンロードできます。こちらをワークスペースにインポートします。

config/notebook-configでデータベースやパスを指定できます。私は以下のように変更しています。

config = {}
config["model_name"] = "insuranceqa_taka"
config["database_name"] = "insuranceqa_taka"
config["model_output_dir"] = "/tmp/insuranceqa_model"
config["pipeline_output_dir"] = "/tmp/insuranceqa_pipeline/artifacts"
config["model_artifact_path"] = "model"
config["main_path"] = "dbfs:/tmp/insurance_taka"
config["main_path_w_dbfs"] = "/dbfs/tmp/insurance_taka"
config["main_local_path"] = "file:/tmp/insurance"

このソリューションアクセラレータは、テーブルをHiveメタストアhive_metastore配下に作成します。

ノートブックRUNMEを実行します。すると、二つのクラスターとジョブが作成され、ジョブが実行されます。

  • ins_qa_cluster: データ処理などで用いるCPUクラスター
  • ins_qa_cluster_train: トレーニングで用いるGPUクラスター

Screenshot 2024-07-24 at 8.39.42.png
Screenshot 2024-07-24 at 8.40.28.png

以下では、実行されるそれぞれのノートブックを説明していきます。

01_explore

全体像の説明とトレーニングデータの準備を行います。

使用するクラスター: ins_qa_cluster

概要

COVID-19のパンデミック以降、さまざまな業界の企業はデジタル導入を加速していますが、保険会社は顧客からの変化し続ける需要に対応できているのでしょうか?

保険会社として、あなたはビジネス価値の創出に時間とリソースの大部分を投入できているのでしょうか?

保険ビジネスにおいてカスタマーサービスは重要なパーツとなっています。これは、複数のビジネスケースでも当てはまります: マーケティングから顧客の維持クレームなどです。

また、カスタマーサービスチームのトレーニングには時間と労力を要するにもかかわらず、他の部門と比べてカスタマーサービスチームの入れ替わりは非常に激しいものとなっています。さらに、保険会社は多くの場合、カスタマーサービスを他の企業にアウトソースしているという事実は、サービス品質一貫性の観点での課題を表しています。

これらのプロセスをデジタル化することで、保険会社は以下のことをシームレスに行えるようになります:

  • 待ち時間を削減することで顧客満足度を向上
  • 電話回数の量を削減することで、優れたインタラクティブ体験を提供
  • 電話代コストを削減
  • 少ないスタッフで多くのことを行えるようにすることで、オペレーションをスケール
  • 予算人的リソースを、オペレーション的なプロセスから、実際の製品価値創造にシフト

このソリューションアクセラレータでは、Interactive Voice Response (IVR)のストリームや、モバイルアプリ、SMS、Whatsappやその他のコミュニケーションチャネルと連携可能なバーチャルアシスタントからの非構造化テキストのピースに基づいて、顧客の意図を検知する機械学習ソリューションの開発とデプロイのためのクイックスタートとなっています。

ターゲットとするソリューション

使い始める

  • このアクセラレータの目的に合わせて、Insurance QA Datasetを活用します。
  • このノートブックでは、このデータセットをダウンロードして、traintestvalidationセットの複数のDeltaテーブルに取り込みます。
  • 次のノートブック(02_explore_clean.py)では、ちょっとしたデータ探索とクリーニングを行います。
  • 次に、我々のデータを用いてNLPモデル(distilbert-en)をファインチューニングします。
  • ファインチューニングしたモデルをラッピングするためのPandas UDFを作成します。これによって、静的/バッチデータソースやストリーミングデータソースの両方に対して予測結果を生成できるようになります。
  • 最後に、Databricksモデルサービングを活用することで、リアルタイム予測エンドポイントとしてモデルをデプロイします。
%pip install datasets
%run ./config/notebook-config

Insurance QA Datasetのダウンロード

from datasets import load_dataset

dataset = load_dataset("j0selit0/insurance-qa-en")
dataset
DatasetDict({
    train: Dataset({
        features: ['index', 'topic_en', 'question_en'],
        num_rows: 12888
    })
    valid: Dataset({
        features: ['index', 'topic_en', 'question_en'],
        num_rows: 1999
    })
    test: Dataset({
        features: ['index', 'topic_en', 'question_en'],
        num_rows: 1999
    })
})

データセットの確認

  • データセットがダウンロードできたので、コンテンツを見てみましょう。
  • 上のセルからは、主に興味の対象となる2つの列があることがわかります: question_entopic_en です。最初の列には、複数の保険トピックに関係する質問が含まれており、2つ目の列にはそれぞれの質問に対する分類/トピックが格納されています。
  • 先に進めて、トレーニングセットから同じデータサンプルを見てみましょう。Hugging Face Datasetsにはカスタムのフォーマット(DatasetDictionary)がありますが、幸運なことに、より直感的な方法でデータのコンテンツを探索できるようにしてくれる to_pandas() のようないくつか非常に使いやすい関数が存在しています。
display(dataset["train"].to_pandas().loc[:10, ["question_en", "topic_en"]])

Screenshot 2024-07-24 at 8.44.06.png

基本的なクリーニング

# 全てを小文字に変換し、余計なスペースを除外しましょう

import re
from datasets import ClassLabel

def clean(example: str) -> str:

  output = []
  for question in example["question_en"]:
    question_clean = question.lower()
    question_clean = re.sub(' {2,}', ' ', question_clean)
    output.append(question_clean)
  
  example["question_en"] = output
  return example

clean_dataset = dataset.map(lambda example: clean(example), batched = True)

# 列名を変更し、labels を ClassLabel に変換します

clean_dataset = clean_dataset.remove_columns(["index"])
clean_dataset = clean_dataset.rename_columns({"question_en": "text", "topic_en": "label"})
names = list(set(clean_dataset["train"]["label"]))
clean_dataset = clean_dataset.cast_column("label", ClassLabel(names = names))

# 以降のトレーニングのためにクリーニングしたデータセットを保存します
## pyarrowは直接DBFSに書き込めないので、クリーンなデータセットをはじめにドライバーのローカルに保存してから、DBFSにコピーします
local_path = "/tmp/insuranceqa"
dbutils.fs.rm(config["main_path"], True)
clean_dataset.save_to_disk(local_path)
dbutils.fs.cp(f"file:///{local_path}", config["main_path"], recurse = True)

データプロファイルの生成

# display関数を用いることで、データセットのデータプロファイルを容易に作成できます

display(dataset["train"].to_pandas())

Screenshot 2024-07-24 at 8.44.50.png

  • 上のプロファイルからは、life insuranceに関連する質問が非常に頻繁にあることに気づきます。保険会社としては、我々の聴衆(マーケティング、セールス、教育会社など)で、この側面を活用するためのアクションを取ることに興味を持つかもしれません。一方、我々のカスタマーサービスチームを強化、スケールさせる必要があることも示しています。より多くの洞察を導出するために、より広範なトピック/意図の分布を見ていきます。
  • これを行うためには、以下のセルを実行し、シンプルに"Table"タブの隣のプラスアイコンをクリックします。ここでは、X軸に topic_en カラムを含め、Y軸に index カラムの COUNT を含めた棒グラフを作成することができます。

Screenshot 2024-07-24 at 8.45.20.png

  • トレーニングセットにおけるトピック/意図の分布を見ることで、生命保険に加えて、自動車保険やメディケアが非常に共通したテーマであることがわかります。
  • 反対の方向に目を向け、あまり共通していない意図を見てみると、critical illness insurancelong term care insuranceother insurance をハイライトすることができます。ここでも、我々はこれら特定のビジネスラインの理解に興味を持ち、それらの利益マージンを比較したいと考えるかもしれません。これらのトピックに関する質問が少ないという事実は、お客様が人間のエージェントと会話することなしに、デジタルチャネルを通じて彼らの問題を解決したり、質問に回答することで、彼らが優れた仕事をしていることを意味しているのかもしれません。

ここで、HuggingFaceデータセットをPandasに変換し、Deltaテーブルに書き込むことでテーブルを作成します。データセットはお使いのドライバーのメモリーより大きくすることができないため、スケーラビリティの制約に直面する場合があります。

より現実的なシナリオにおいては、あなたのデータをSparkが並列処理できるようなフォーマットで提供しても構いません。

推論のためにDeltaにテストセットを保存

test_df = spark.createDataFrame(dataset["test"].to_pandas())
test_df.write.saveAsTable("questions", mode = "overwrite")

次のステップ

  • データセットを入手し、十分に探索したので、モデルのトレーニングに進みます。
  • 我々のモデルは12の異なるトピックに関する顧客の質問の分類に活用されます。
  • トレーニングをスタートするには、リポジトリの02_trainノートブックを参照してください。

02_train

モデルのトレーニングを行います。

使用するクラスター: ins_qa_cluster_train

%pip install -q datasets
dbutils.library.restartPython()
%run ./config/notebook-config

Distilbertの例

  • 以下のセルでは、動作原理を確認するためにシンプルにHugging Faceのテキスト分類パイプラインのインスタンスを作成しています。
  • 先に進めて、いくつかのテキストに対してサンプルの予測を実行します。
from transformers import pipeline

pipe = pipeline("text-classification")
pipe(["This restaurant is awesome", "This restaurant is awful"])
[{'label': 'POSITIVE', 'score': 0.9998743534088135},
 {'label': 'NEGATIVE', 'score': 0.9996669292449951}]
  • Hugging Faceが受け入れるフォーマットでデータセットを作成する必要があります。
  • 我々のデータがどのようにエンコードトークナイズされるのかを定義する必要があります。
  • 我々のモデルには異なる12のラベルがあります。この部分をカスタマイズするために、Hugging FaceのAutoModelForSequenceClassificationクラスを活用します。

データセットの読み込み

import datasets

local_path = "/tmp/insurance"
dbutils.fs.rm(f"file://{local_path}", recurse = True)
dbutils.fs.cp(config["main_path"], f"file://{local_path}", recurse = True)
dataset = datasets.load_from_disk(local_path)

データセットのトークナイズ

from transformers import AutoTokenizer

base_model = "distilbert-base-uncased"
tokenizer = AutoTokenizer.from_pretrained(base_model)

def tokenize(examples):
  return tokenizer(examples["text"], padding = True, truncation = True, return_tensors = "pt")

tokenized_dataset = dataset.map(tokenize, batched = True)

モデルのトレーニング

ここでは以下を行います:

  • Trainerオブジェクトの作成 - これはトレーニングを容易にするHugging Faceのヘルパークラスです
  • TrainingArgumentsオブジェクトのインスタンス作成
  • EarlyStoppingCallbackの作成 - これはモデルの過学習の回避に役立ちます
  • モデルのトレーニング
from transformers import AutoModelForSequenceClassification
import datasets

dataset = datasets.load_from_disk(config["main_path_w_dbfs"])
label2id = dataset["train"].features["label"]._str2int
id2label = dataset["train"].features["label"]._int2str

model = AutoModelForSequenceClassification.from_pretrained(
  base_model,
  num_labels = len(label2id),
  label2id = label2id,
  id2label = dict(enumerate(id2label))
)

DistilBERTアーキテクチャの確認

model
modelの中身 ``` DistilBertForSequenceClassification( (distilbert): DistilBertModel( (embeddings): Embeddings( (word_embeddings): Embedding(30522, 768, padding_idx=0) (position_embeddings): Embedding(512, 768) (LayerNorm): LayerNorm((768,), eps=1e-12, elementwise_affine=True) (dropout): Dropout(p=0.1, inplace=False) ) (transformer): Transformer( (layer): ModuleList( (0): TransformerBlock( (attention): MultiHeadSelfAttention( (dropout): Dropout(p=0.1, inplace=False) (q_lin): Linear(in_features=768, out_features=768, bias=True) (k_lin): Linear(in_features=768, out_features=768, bias=True) (v_lin): Linear(in_features=768, out_features=768, bias=True) (out_lin): Linear(in_features=768, out_features=768, bias=True) ) (sa_layer_norm): LayerNorm((768,), eps=1e-12, elementwise_affine=True) (ffn): FFN( (dropout): Dropout(p=0.1, inplace=False) (lin1): Linear(in_features=768, out_features=3072, bias=True) (lin2): Linear(in_features=3072, out_features=768, bias=True) (activation): GELUActivation() ) (output_layer_norm): LayerNorm((768,), eps=1e-12, elementwise_affine=True) ) (1): TransformerBlock( (attention): MultiHeadSelfAttention( (dropout): Dropout(p=0.1, inplace=False) (q_lin): Linear(in_features=768, out_features=768, bias=True) (k_lin): Linear(in_features=768, out_features=768, bias=True) (v_lin): Linear(in_features=768, out_features=768, bias=True) (out_lin): Linear(in_features=768, out_features=768, bias=True) ) (sa_layer_norm): LayerNorm((768,), eps=1e-12, elementwise_affine=True) (ffn): FFN( (dropout): Dropout(p=0.1, inplace=False) (lin1): Linear(in_features=768, out_features=3072, bias=True) (lin2): Linear(in_features=3072, out_features=768, bias=True) (activation): GELUActivation() ) (output_layer_norm): LayerNorm((768,), eps=1e-12, elementwise_affine=True) ) (2): TransformerBlock( (attention): MultiHeadSelfAttention( (dropout): Dropout(p=0.1, inplace=False) (q_lin): Linear(in_features=768, out_features=768, bias=True) (k_lin): Linear(in_features=768, out_features=768, bias=True) (v_lin): Linear(in_features=768, out_features=768, bias=True) (out_lin): Linear(in_features=768, out_features=768, bias=True) ) (sa_layer_norm): LayerNorm((768,), eps=1e-12, elementwise_affine=True) (ffn): FFN( (dropout): Dropout(p=0.1, inplace=False) (lin1): Linear(in_features=768, out_features=3072, bias=True) (lin2): Linear(in_features=3072, out_features=768, bias=True) (activation): GELUActivation() ) (output_layer_norm): LayerNorm((768,), eps=1e-12, elementwise_affine=True) ) (3): TransformerBlock( (attention): MultiHeadSelfAttention( (dropout): Dropout(p=0.1, inplace=False) (q_lin): Linear(in_features=768, out_features=768, bias=True) (k_lin): Linear(in_features=768, out_features=768, bias=True) (v_lin): Linear(in_features=768, out_features=768, bias=True) (out_lin): Linear(in_features=768, out_features=768, bias=True) ) (sa_layer_norm): LayerNorm((768,), eps=1e-12, elementwise_affine=True) (ffn): FFN( (dropout): Dropout(p=0.1, inplace=False) (lin1): Linear(in_features=768, out_features=3072, bias=True) (lin2): Linear(in_features=3072, out_features=768, bias=True) (activation): GELUActivation() ) (output_layer_norm): LayerNorm((768,), eps=1e-12, elementwise_affine=True) ) (4): TransformerBlock( (attention): MultiHeadSelfAttention( (dropout): Dropout(p=0.1, inplace=False) (q_lin): Linear(in_features=768, out_features=768, bias=True) (k_lin): Linear(in_features=768, out_features=768, bias=True) (v_lin): Linear(in_features=768, out_features=768, bias=True) (out_lin): Linear(in_features=768, out_features=768, bias=True) ) (sa_layer_norm): LayerNorm((768,), eps=1e-12, elementwise_affine=True) (ffn): FFN( (dropout): Dropout(p=0.1, inplace=False) (lin1): Linear(in_features=768, out_features=3072, bias=True) (lin2): Linear(in_features=3072, out_features=768, bias=True) (activation): GELUActivation() ) (output_layer_norm): LayerNorm((768,), eps=1e-12, elementwise_affine=True) ) (5): TransformerBlock( (attention): MultiHeadSelfAttention( (dropout): Dropout(p=0.1, inplace=False) (q_lin): Linear(in_features=768, out_features=768, bias=True) (k_lin): Linear(in_features=768, out_features=768, bias=True) (v_lin): Linear(in_features=768, out_features=768, bias=True) (out_lin): Linear(in_features=768, out_features=768, bias=True) ) (sa_layer_norm): LayerNorm((768,), eps=1e-12, elementwise_affine=True) (ffn): FFN( (dropout): Dropout(p=0.1, inplace=False) (lin1): Linear(in_features=768, out_features=3072, bias=True) (lin2): Linear(in_features=3072, out_features=768, bias=True) (activation): GELUActivation() ) (output_layer_norm): LayerNorm((768,), eps=1e-12, elementwise_affine=True) ) ) ) ) (pre_classifier): Linear(in_features=768, out_features=768, bias=True) (classifier): Linear(in_features=768, out_features=12, bias=True) (dropout): Dropout(p=0.2, inplace=False) ) ``` 

クラスの重みの定義

  • 前のノートブックでは、意図の分布に偏りがあることを確認しており、多くがlife-insuranceに関する質問となっています。
  • これは、モデルの汎化能力において課題となります。
  • トレーニングセットにおける頻度に応じて、特定の意図やトピックに対する間違った予測に罰則を加えるようにするために、クラスごとに異なる重みを定義することができます。
  • これを行うために、データセットの12の意図に対するクラスの重みを計算します。頻度が少ない意図ほど、間違った予測を生成した際の罰則が重くなります。
from collections import Counter

def get_class_weights(labels):

  counter = Counter(labels)
  item_count_dict = dict(counter.items())
  size = len(labels)
  weights = list({k: (size / v) for k, v in sorted(item_count_dict.items())}.values())
  return weights

weights = get_class_weights(dataset["train"]["label"])
import mlflow
import itertools
import torch

from transformers import (
  TrainingArguments,
  Trainer,
  DataCollatorWithPadding,
  EarlyStoppingCallback,
  ProgressCallback
)

# ここで、データセットにおける様々なサンプルを組み合わせ、同じ長さになるようにパディングを行うDataCollatorを定義します
data_collator = DataCollatorWithPadding(tokenizer)

# パフォーマンス指標が改善しなくなったらトレーニングを停止するように、EarlyStoppingCallbackを使います
early_stopping = EarlyStoppingCallback(
  early_stopping_patience = 3,
  early_stopping_threshold = 0.01
)

device = "cuda:0" if torch.cuda.is_available() else "cpu"
model.to(torch.device(device))

training_args = TrainingArguments(
  output_dir = "/tmp/insurance_qa",
  evaluation_strategy = "steps",
  eval_steps = 100,
  num_train_epochs = 1, # 性能を改善する際には増やします
  per_device_train_batch_size = 128,
  per_device_eval_batch_size = 128,
  load_best_model_at_end = True,
  learning_rate = 2e-5,
  weight_decay = 0.01,
  xpu_backend = "ccl",
  no_cuda = False if torch.cuda.is_available() else True
)

class CustomTrainer(Trainer):
  
  def __init__(self, **kwargs):
    super().__init__(**kwargs)

  def compute_loss(self, model, inputs, return_outputs=False):
    labels = inputs.get("labels")
    # フォワードパス
    outputs = model(**inputs)
    logits = outputs.get('logits')
    # カスタムロスの計算
    device = 0 if torch.cuda.is_available() else -1
    weights_tensor = torch.tensor(weights).to(device)
    loss_fct = torch.nn.CrossEntropyLoss(weight = weights_tensor)
    loss = loss_fct(logits.view(-1, self.model.config.num_labels), labels.view(-1))
    return (loss, outputs) if return_outputs else loss

trainer = CustomTrainer(
    model = model,
    args = training_args,
    train_dataset = tokenized_dataset["train"],
    eval_dataset = tokenized_dataset["valid"],
    data_collator = data_collator,
    callbacks = [early_stopping, ProgressCallback()]
)

モデルのトレーニングとアーティファクトの保存

from transformers import pipeline
import logging

class InsuranceQAModel(mlflow.pyfunc.PythonModel):

  def load_context(self, context):
    device = 0 if torch.cuda.is_available() else -1
    pipeline_path = context.artifacts["insuranceqa_pipeline"]
    model_path = context.artifacts["base_model"]
    self.pipeline = pipeline(
      "text-classification",
      model = context.artifacts["insuranceqa_pipeline"],
      config = context.artifacts["base_model"],
      device = device
    )
    
  def predict(self, context, model_input):
    try:
      logging.info(f"Model input: {model_input}")
      questions = list(model_input)

      results = self.pipeline(questions, truncation = True, batch_size = 8)
      labels = [result["label"] for result in results]
      logging.info(f"Model output: {labels}")
      return labels

    except Exception as exception:
      logging.error(f"Model input: {questions}, type: {str(type(questions))}")
      return {"error": str(exception)}
from transformers import pipeline
import transformers
import numpy as np

model_output_dir = "/tmp/insuranceqa_model"
pipeline_output_dir = "/tmp/insuranceqa_pipeline/artifacts"
model_artifact_path = "model"

#mlflow.set_experiment(experiment_name = "/Shared/insuranceqa_distilbert")

with mlflow.start_run() as run:
  trainer.train()
  trainer.save_model(model_output_dir)
  pipe = pipeline(
    "text-classification",
    model = model,
    config = model.config,
    batch_size = 8,
    tokenizer = tokenizer
  )
  pipe.save_pretrained(pipeline_output_dir)

  # カスタム PyFunc モデルの記録
  mlflow.pyfunc.log_model(
    artifacts = {
      "insuranceqa_pipeline": pipeline_output_dir,
      "base_model": model_output_dir
    },
    artifact_path = model_artifact_path,
    python_model = InsuranceQAModel(),
    pip_requirements = [
      f"""torch=={torch.__version__.split("+")[0]}""",
      f"""transformers=={transformers.__version__}"""
    ]
  )

初期の分析

  • 1エポックのみでテキスト分類モデルをトレーニングしているので、最適ではないパフォーマンスであることを理解する必要があります。
  • フォローアップのアクションにおいては、エポック数を歩訳すことができます - これによって、データセットにおける様々な部分からの洞察をモデルに提供するので、様々な意図も提供することになりますので、汎化性能が改善されます。

最もパフォーマンスの高いランを取得してモデルを登録

runs = mlflow.search_runs(
  order_by = ["start_time DESC"],
  filter_string = "attributes.status = 'FINISHED'"
)

target_run_id = runs.loc[0, "run_id"]
logged_model_uri = f"runs:/{target_run_id}/model"
loaded_model = mlflow.pyfunc.load_model(logged_model_uri)
loaded_model
mlflow.pyfunc.loaded_model:
  artifact_path: model
  flavor: mlflow.pyfunc.model
  run_id: c9c32dd970164b528c44589036a5efc3

予測テストの実行

loaded_model.predict(["my car broke, what should I do?"])

自動車保険と返ってきます。

['auto-insurance']

モデルの登録

モデルレジストリにモデルを登録します。

ここでは、ワークスペースモデルレジストリにモデルを登録しています。

from mlflow.tracking import MlflowClient
import pandas as pd
from typing import List
client = MlflowClient()

# モデルの登録
model_details = mlflow.register_model(logged_model_uri, config["model_name"])

# シンプルなテスト
model_uri = f"""models:/{config["model_name"]}/latest"""
pipeline = mlflow.pyfunc.load_model(model_uri = model_uri)

simple_df = pd.DataFrame(["hi I crashed my car"], columns = ["question_en"])
test_prediction = pipeline.predict(simple_df.question_en)
print(f"Prediction: {test_prediction}")

if test_prediction is not None:

  # レジストリのモデルを "Production" ステージに移行
  client.transition_model_version_stage(
    name = config["model_name"],
    version = model_details.version,
    stage="Production",
    archive_existing_versions=True
  )

03_inference

推論を実行します。

使用するクラスター: ins_qa_cluster

%pip install -q datasets
dbutils.library.restartPython()

セットアップ

%run ./config/notebook-config

推論する質問の読み込み

from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType

# 質問の読み込み
test_df = spark.sql("select question_en, topic_en from questions")

# 少なくともクラスターのワーカーのコア数まで並列度を増加 - データボリュームが多いほど、この値をワーカーのコア数の倍数に設定します
sc = spark._jsc.sc() 
worker_count = max(1, len([executor.host() for executor in sc.statusTracker().getExecutorInfos() ]) - 1) 
total_worker_cores = spark.sparkContext.defaultParallelism * worker_count
test_df = test_df.repartition(total_worker_cores)

モデルのロードとテスト予測の生成

from typing import List
import pandas as pd
from pyspark.sql import functions as F
from mlflow.tracking import MlflowClient
import mlflow

# モデルをロードし、UDFでラッピング
pipeline = mlflow.pyfunc.load_model(f"models:/{config['model_name']}/production")

def predict(questions: pd.Series) -> pd.Series:
  """前のステップで作成したパイプラインに対するラッパー関数です。"""

  result = pipeline.predict(questions.to_list())
  return pd.Series(result)

predict_udf = F.pandas_udf(predict, returnType = StringType())

# UDFを用いた推論の実施
test_df = (
  test_df
    .withColumn("predicted", predict_udf(F.col("question_en")))
)

予測結果をDeltaに保存

(
  test_df
    .write
    .saveAsTable(
      "predictions",
      mode = "overwrite",
      mergeSchema = True
    )
)

predictions = spark.sql("select * from predictions")
display(predictions)

Screenshot 2024-07-24 at 8.56.28.png

パフォーマンス指標の計算

# 正しい予測結果の数を計算し、予測結果合計数で割ります

import seaborn as sns
from matplotlib import pyplot as plt
%matplotlib inline
%config InlineBackend.figure_format='retina'

pred_df = predictions.toPandas()
pred_df["hit"] = pd.to_numeric(pred_df["topic_en"] == pred_df["predicted"])
accuracy_per_intent = pred_df.groupby("topic_en").hit.mean().reset_index()

fig, ax = plt.subplots(nrows = 1, ncols = 1, figsize = (4,4))
plt.xticks(rotation = 45)
sns.barplot(
  y = accuracy_per_intent["topic_en"],
  x = accuracy_per_intent["hit"],
  palette = "mako",
  ax = ax
)
plt.title("Prediction Accuracy per Intent")

download.png

04_deploy

登録したモデルをモデルサービングエンドポイントにデプロイします。

使用するクラスター: ins_qa_cluster

Databricksモデルサービングを用いたリアルタイムモデルのデプロイ

  • このステップでは、トレーニングしたモデルをデプロイし、リアルタイムモデルサービングエンドポイントして登録します。
  • 現実世界のシナリオにおいては、これによって、REST APIとしてモデルを公開することができ、IVRのルーティングやチャットボットようなリアルタイムのユースケースを実現できるようになります。
%pip install -q datasets
dbutils.library.restartPython()

セットアップ

%run ./config/notebook-config
import mlflow
from mlflow.tracking import MlflowClient
import requests
import json

client = MlflowClient()
model_name = config["model_name"]
endpoint_name = f"{model_name}_v2"
model_info = client.get_latest_versions(name = model_name, stages = ["Production"])[0]

databricks_url = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiUrl().getOrElse(None)
databricks_token = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().getOrElse(None)
api_endpoint = "api/2.0/serving-endpoints"

headers = {
    'Authorization': 'Bearer {}'.format(databricks_token)
  }

def get_endpoint(endpoint_name: str) -> bool:

  payload = {"name": endpoint_name}

  response = requests.get(
    f"{databricks_url}/{api_endpoint}",
    data = json.dumps(payload),
    headers = headers
  ).json()

  endpoint_exists = endpoint_name in [endpoint["name"] for endpoint in response["endpoints"]]
  return endpoint_exists


def create_endpoint(endpoint_name: str, model_name: str, model_version: int) -> dict:

  payload = {
    "name": endpoint_name,
    "config": {
    "served_models": [{
      "model_name": model_name,
      "model_version": f"{model_info.version}",
      "workload_size": "Small",
      "scale_to_zero_enabled": "true"
      }]
    }
  }

  if not get_endpoint(endpoint_name):
    print("Endpoint doesn't exist, creating...")
    response = requests.post(
      f"{databricks_url}/{api_endpoint}",
      data = json.dumps(payload),
      headers = headers
    ).json()
  else:
    print("Endpoint exists, updating...")
    update_payload = {}
    update_payload["served_models"] = payload["config"]["served_models"]
    response = requests.put(
      f"{databricks_url}/{api_endpoint}/{endpoint_name}/config",
      data = json.dumps(update_payload),
      headers = headers
    ).json()

  return response

create_endpoint(
  endpoint_name = endpoint_name,
  model_name = model_info.name,
  model_version = model_info.version
)

モデルエンドポイントのデプロイメントステータスのチェック

import time
from IPython.display import clear_output

def check_endpoint_status(endpoint_name: str, max_retries: int = 1000, interval: int = 5) -> str:
  """Check the Model Serving deployment status at every time step defined with the interval parameters"""

  current_tries = 0

  while current_tries < max_retries:
    clear_output(wait = True)
    response_json = requests.get(
      f"{databricks_url}/{api_endpoint}",
      headers = headers
    ).json()
    endpoint_status = [
      endpoint for endpoint
      in response_json["endpoints"]
      if endpoint["name"] == endpoint_name
    ][0]
    current_state = endpoint_status["state"]["config_update"]

    if (current_state == "IN_PROGRESS"):
      print(f"Checking model deployment status, attempt {current_tries} of {max_retries} - current state: {current_state}")
    else:
      message = f"Model endpoint deployment result: {endpoint_status}"
      return message

    current_tries += 1
    time.sleep(interval)

check_endpoint_status(endpoint_name = endpoint_name)

しばらく待つと、エンドポイントが稼働します。

"Model endpoint deployment result: {'name': 'insuranceqa_taka_v2', 'creator': 'takaaki.yayoi@databricks.com', 'creation_timestamp': 1721707130000, 'last_updated_timestamp': 1721707130000, 'state': {'ready': 'READY', 'config_update': 'NOT_UPDATING'}, 'config': {'served_models': [{'name': 'insuranceqa_taka-1', 'model_name': 'insuranceqa_taka', 'model_version': '1'}], 'served_entities': [{'name': 'insuranceqa_taka-1', 'entity_name': 'insuranceqa_taka', 'entity_version': '1'}]}, 'id': '3e7622f4ca9f47c7bf7f42212f34eb0f', 'route_optimized': False, 'permission_level': 'CAN_MANAGE'}"

Screenshot 2024-07-24 at 8.59.43.png

REST APIを通じたエンドポイントへのクエリー

def test_prediction_endpoint(questions):
  endpoint_url = f"serving-endpoints/{endpoint_name}/invocations"
  payload = {"instances": questions}
  
  data_json = json.dumps(payload)
  print(data_json)
  headers["Content-Type"] = 'application/json'
  response = requests.request(
    method='POST',
    headers = headers,
    url = f"{databricks_url}/{endpoint_url}",
    data = data_json
  )
  if response.status_code != 200:
    raise Exception(f'Request failed with status {response.status_code}, {response.text}')
  return response.json()

test_questions = [
  "my car broke, what should I do?",
  "what is my life insurance coverage?",
  "can you send me my health insurance cover?"
]
test_prediction_endpoint(test_questions)

質問のそれぞれに対して適切な分類が行われていることがわかります。

{"instances": ["my car broke, what should I do?", "what is my life insurance coverage?", "can you send me my health insurance cover?"]}
{'predictions': ['auto-insurance', 'life-insurance', 'health-insurance']}

はじめてのDatabricks

はじめてのDatabricks

Databricks無料トライアル

Databricks無料トライアル

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0