こちらで紹介されているノートブックを実行していきます。
ノートブックはこちらで公開されています。2つのノートブックが含まれています。
最初のノートブックの1_anomaly_detection_pcaでは、ベンダーごとの製品名のエンべディングに対する主成分分析を行い、次元削減されたベンダーを特徴づける軸に製品をマッピングします。ベンダーの特徴(多数派)に合致している製品であれば、次元削減されたものをオリジナルに復元してもエラーは僅少となります。一方でベンダーの特徴から乖離している製品はオリジナルに復元した際のエラーが大きくなる(次元削減された多数派の製品を表現する軸では当該製品の特徴が表現できていない)という考え方に基づいて異常検知を行なっています。以下で実際に動かしていますが、この仕組みで思った以上に異常検知ができています。しかし、配送料(Delivery Fee
)は人の目で見れば「これは製品の配送料なのね」となりますが、PCAはあくまで主成分の数値表現しか見ないので、このような物も外れ値として判定されてしまいます。
この問題を解決するためにLLMを用いるのが二つ目のノートブックの2_hybrid_function-calling-anomaly-examplesです。PCAで検知されたベンダーの情報とfew-shotの例をLLMに与えることで、ビジネス文脈を踏まえて検知結果を是正させています。これがハイブリッドアプローチと呼ばれる理由です。興味深い。
主成分分析(PCA)によるエンべディングの異常検知
1_anomaly_detection_pcaを実行します。
このノートブックでは、ベンダー内の製品説明のエンべディングを分析することで、外れ値を特定する方法として主成分分析(PCA)を探索します。PCAは次元削減のための強力な手法を提供し、エンべディング空間での再構成誤差に基づいて異常を明らかにする可能性があります。PCAは外れ値検出のための多くの利用可能なアルゴリズムの1つに過ぎず、それぞれに独自の利点と制限があります。t-SNE、UMAP、または最近傍のような代替手法は、データとエンべディングの特性に応じて異なる洞察を提供する場合があります。したがって、PCAは効果的である可能性がありますが、万能ではなく、複数の手法を包括的に評価することが堅牢な異常検出には不可欠です。
コンピュート: このノートブックは、Databricksのシングルユーザークラスター、ランタイムバージョン15.3以上で実行することをお勧めします。
データ取り込み
後半で日本語を含むプロットを行うのでjapanize-matplotlib
をインストールします。
%pip install japanize-matplotlib
カタログやスキーマを変更する際には、./_resources/0_setup
を事前に編集しておきます。
%run ./_resources/0_setup
df_spark = spark.read.csv(config['vol_data_landing'],
header=True,
inferSchema=True,
sep=",")
display(df_spark)
ベンダーと製品のペアになっています。
Vendor | Products |
---|---|
Vendor 1 | Delivery Fee |
Vendor 1 | Bakery Tools |
Vendor 3 | Pet Grooming Services |
Vendor 4 | Delivery Charges |
Vendor 5 | Fishing Gear |
Vendor 7 | Cloud Software |
Vendor 9 | Delivery Fee |
Vendor 9 | Catering Services |
データ準備
import pandas as pd
df = df_spark.toPandas()
df.info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 194 entries, 0 to 193
Data columns (total 2 columns):
# Column Non-Null Count Dtype
--- ------ -------------- -----
0 Vendor 194 non-null object
1 Products 194 non-null object
dtypes: object(2)
memory usage: 3.2+ KB
df['embedding_input'] = df['Products']
基盤モデルAPIでエンべディングモデルを呼び出します。
import mlflow.deployments
deploy_client = mlflow.deployments.get_deploy_client("databricks")
# テキストのエンべディングを取得する関数
def get_embedding(text):
response = deploy_client.predict(endpoint="databricks-gte-large-en", inputs={"input": [text]})
embedding = [e['embedding'] for e in response.data]
return embedding[0] if embedding else []
# 文字列からエンべディングを作成
df['embeddings'] = df['embedding_input'].apply(get_embedding)
import numpy as np
embeddings = np.array(df['embeddings'].tolist())
print("Shape of embeddings:", embeddings.shape)
Shape of embeddings: (194, 1024)
display(df)
Vendor | Products | embedding_input | embeddings |
---|---|---|---|
Vendor 1 | Delivery Fee | Delivery Fee | [-0.76611328125,0.270751953125,-0.876953125,...] |
Vendor 1 | Bakery Tools | Bakery Tools | [-0.30078125,-0.78857421875,-0.377685546875,0.10821533203125,...5] |
エンべディングに主成分分析を適用する
PCAを使用して異常を検出する関数を定義します。しきい値として、再構成誤差が99.9%を超えるポイントを外れ値としてマークします。しきい値の微調整は、使用ケースのニーズに基づいて行う必要があります。
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
embedding_column = "embeddings"
def detect_anomalies(group_df: pd.DataFrame) -> pd.DataFrame:
embeddings = np.vstack(group_df[embedding_column])
# サンプル数が1より大きいかどうかを確認
if embeddings.shape[0] > 1 and embeddings.shape[1] > 1:
# 埋め込みをスケーリング
scaler = StandardScaler()
embeddings_scaled = scaler.fit_transform(embeddings)
# PCAを適用
pca = PCA(n_components=min(embeddings_scaled.shape[0], embeddings_scaled.shape[1], 2), svd_solver='randomized', random_state=23)
reduced_embeddings = pca.fit_transform(embeddings_scaled)
# エンべディングを再構築
reconstructed_embeddings = pca.inverse_transform(reduced_embeddings)
# 再構築誤差を計算
reconstruction_error = np.mean((embeddings_scaled - reconstructed_embeddings) ** 2, axis=1)
# 異常値の閾値を定義(例:上位1%を異常値とする)
threshold = np.percentile(reconstruction_error, 99)
# 異常値を識別
group_df['is_anomaly'] = reconstruction_error > threshold
group_df['reconstruction_error'] = reconstruction_error
group_df[['pca1', 'pca2']] = reduced_embeddings
else:
# サンプルが十分でない場合、すべてを非異常としてマークするか、別の方法で処理
group_df['is_anomaly'] = False
group_df['reconstruction_error'] = np.nan
group_df[['pca1', 'pca2']] = [np.nan, np.nan]
return group_df
データ全体に対して効率的に異常検出器を実行する関数。
from pyspark.sql.functions import pandas_udf
# detect_anomalies関数をPandas UDFに変換
@pandas_udf("Vendor string, value double, embedding array<double>, is_anomaly boolean, reconstruction_error double, pca1 double, pca2 double")
def process_batch(group_df: pd.DataFrame) -> pd.DataFrame:
embedding_column = "embeddings"
return detect_anomalies(group_df, embedding_column)
from joblib import Parallel, delayed
def process_batch(filtered_df: pd.DataFrame) -> pd.DataFrame:
# 各グループに対して異常検知関数を並列で適用
results = Parallel(n_jobs=-1)(delayed(detect_anomalies)(group)
for _, group in filtered_df.groupby('Vendor'))
# 結果を連結
return pd.concat(results).reset_index(drop=True)
df_with_anomalies_all = process_batch(df)
df_with_anomalies_all.shape
(194, 8)
# カタログにPCA出力を保存
_=(
spark.createDataFrame(df_with_anomalies_all)
.write
.format("delta")
.mode('overwrite')
.option('overwriteSchema','true')
.saveAsTable(f"{config['catalog']}.{config['schema']}.pca_anomaly_detection_synthetic_data")
)
結果と可視化
# 異常値が特定されたユニークなベンダーの数をカウント
number_vendors_with_outliers = df_with_anomalies_all[df_with_anomalies_all['is_anomaly'] == True]['Vendor'].nunique()
print(f"異常値があるベンダーの総数: {number_vendors_with_outliers}")
異常値があるベンダーの総数: 6
# ベンダーごとに製品を表示し、それらが異常とマークされているかどうかを表示する関数
def print_anomalies(df, vendor):
total_count_outliers = df[(df['Vendor']== vendor) & (df['is_anomaly']== True)]['Products'].count()
outliers = df[(df['Vendor']== vendor) & (df['is_anomaly']== True)]['Products'].tolist()
print(f"{vendor} の異常値を含む合計行数: {total_count_outliers}")
print(f"{vendor} の製品の異常値: {set(outliers)}")
anomalies_vendor = df[df['Vendor']== vendor][['Vendor','Products','is_anomaly']].drop_duplicates()
return anomalies_vendor
import matplotlib.pyplot as plt
import japanize_matplotlib
def plot_pca_scatter(df_input, vendor):
plt.figure(figsize=(8, 6))
df = df_input[df_input['Vendor']== vendor]
# PCAコンポーネントと再構築誤差の散布図
scatter = plt.scatter(df['pca1'], df['pca2'], c=df['reconstruction_error'], cmap='viridis', alpha=0.7)
plt.colorbar(scatter, label='再構築誤差')
plt.title('製品埋め込みのPCAと再構築誤差')
plt.xlabel('PCAコンポーネント1')
plt.ylabel('PCAコンポーネント2')
# 製品名でプロットに注釈を付ける
for i, row in df.iterrows():
plt.annotate(row['Products'], (row['pca1'], row['pca2']),
fontsize=9, alpha=0.6, color='black')
plt.show()
plot_pca_scatter(df_with_anomalies_all,'Vendor 7')
Cloud Software
が外れ値として検知されています。
print_anomalies(df_with_anomalies_all,'Vendor 7')
Vendor 7 の異常値を含む合計行数: 1
Vendor 7 の製品の異常値: {'Cloud Software '}
Vender 7の提供製品の中では異質と判断されています。
plot_pca_scatter(df_with_anomalies_all,'Vendor 1')
print_anomalies(df_with_anomalies_all,'Vendor 1')
Vendor 1 の異常値を含む合計行数: 1
Vendor 1 の製品の異常値: {'Delivery Fee'}
Delivery Fee
が異質と判断されています。しかし、こちらで言及されているように配送料を異質と判断するのはビジネス的な文脈を考慮しておらず、これが従来のMLモデルの限界となっています。
このような限界を越えるために、次ではLLMも活用したハイブリッドアプローチを採用します。
PCAとLLMを用いた異常検知
2_hybrid_function-calling-anomaly-examplesを実行します。
PCAモデルは異常な購入を検出しますが、文脈や指示のセットを与えることで異常ではないと識別される購入があるかもしれません。このソリューションは、PCAモデルのスコアに基づいて異常な取引を持つベンダーを評価し、それらのベンダーから購入された製品が異常と識別されるかどうかについて評価結果の理由を提供するようにLLMに指示することでさらに改善できます。
環境と関数のセットアップ
コンピュート: このノートブックは、Databricksのシングルユーザークラスター、ランタイムバージョン15.3以上で実行することをお勧めします。
デモで使用するライブラリのインストール
%pip install --upgrade openai tenacity tqdm
dbutils.library.restartPython()
%run ./_resources/0_setup
モデルエンドポイントの選択
# 使用するモデルのエンドポイントID。すべてのエンドポイントが関数呼び出しをサポートしているわけではありません。
MODEL_ENDPOINT_ID = "databricks-meta-llama-3-3-70b-instruct"
APIクライアントのセットアップ
import concurrent.futures
import pandas as pd
from openai import OpenAI, RateLimitError
from tenacity import (
retry,
stop_after_attempt,
wait_random_exponential,
retry_if_exception,
) # エクスポネンシャルバックオフのため
from tqdm.notebook import tqdm
from typing import List, Optional
# エンドポイントと通信するためにトークンとワークスペースのベースFMAPI URLが必要です
fmapi_token = (
dbutils.notebook.entry_point.getDbutils()
.notebook()
.getContext()
.apiToken()
.getOrElse(None)
)
fmapi_base_url = (
f'https://{spark.conf.get("spark.databricks.workspaceUrl")}/serving-endpoints'
)
以下では、LLMが指定されたスキーマに従って応答するのを支援するヘルパー関数を定義します。
再試行エラーをバックオフで処理することを強くお勧めします。これにより、ペイパートークンのレート制限に達した場合でも、コードが適切に縮退します。
ヘルパー関数のセットアップ
openai_client = OpenAI(api_key=fmapi_token, base_url=fmapi_base_url)
# 注意: バックオフでエラーのリトライ対応を行うことを*強く*お勧めします。これによって、トークン課金のレート制限に引っかかった場合に適切にコードを縮退させます。
@retry(
wait=wait_random_exponential(min=1, max=30),
stop=stop_after_attempt(3),
retry=retry_if_exception(RateLimitError),
)
def call_chat_model(
prompt: str, temperature: float = 0.0, max_tokens: int = 100, **kwargs
):
"""チャットモデルを呼び出し、応答テキストまたはツール呼び出しを返します。"""
chat_args = {
"model": MODEL_ENDPOINT_ID,
"messages": [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": prompt},
],
"max_tokens": max_tokens,
"temperature": temperature,
}
chat_args.update(kwargs)
try:
chat_completion = openai_client.chat.completions.create(**chat_args)
response = chat_completion.choices[0].message
if response.tool_calls:
call_args = [c.function.arguments for c in response.tool_calls]
if len(call_args) == 1:
return call_args[0]
return call_args
return response.content
except Exception as e:
# print(f"Error: {e}")
return None
def call_in_parallel(func, prompts: List[str]) -> List:
"""すべてのプロンプトに対して並行してfunc(p)を呼び出し、応答を返します。"""
# これは、デフォルトのワークスペースレート制限をトリガーしないように、比較的小さなスレッドプールを使用します。
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
results = []
for r in tqdm(executor.map(func, prompts), total=len(prompts)):
results.append(r)
return results
def results_to_dataframe(products: List[str], responses: List[str]):
"""レビューとモデルの応答をデータフレームに結合し、表形式で表示します。"""
return pd.DataFrame({"Products list": products, "Model response": responses})
PCAモデルで異常が検出されたベンダーのデータを読み込む
df_spark = spark.table(f"{config['catalog']}.{config['schema']}.pca_anomaly_detection_synthetic_data")
df = df_spark.toPandas()
display(df)
Vendor | Products | embedding_input | embeddings | is_anomaly | reconstruction_error | pca1 | pca2 |
---|---|---|---|---|---|---|---|
Vendor 1 | Delivery Fee | Delivery Fee | [-0.76611328125,0.270751953125,-0.876953125,0.1845703125,...] | true | 1.1395140465383746 | 34.273987566080656 | 12.10339976854736 |
Vendor 1 | Bakery Tools | Bakery Tools | [-0.30078125,-0.78857421875,-0.377685546875,0.10821533203125,...] | false | 1.0907131703442676 | 36.13968292807728 | 13.253154682674277 |
# 異常と識別された購入を持つベンダーのリスト
vendors_with_anomalies = df[df['is_anomaly'] == True]['Vendor'].unique()
# 各異常を持つベンダーのためのLLM入力を保持する辞書を作成
llm_inputs = {}
# 異常を持つ各ベンダーをループし、ユニークな製品のリストを取得
for vendor in vendors_with_anomalies:
unique_products = df[df['Vendor'] == vendor]['Products'].unique().tolist()
# 辞書に追加
llm_inputs[vendor] = unique_products
# LLM入力を表示
for unique_products in llm_inputs.items():
display(unique_products)
display("-" * 50)
('Vendor 1',
['Delivery Fee',
'Bakery Tools ',
'Hire of Scaffolding',
'Hire of Ladders & Steps',
'Hire of Site Fencing'])
'--------------------------------------------------'
('Vendor 3',
['Pet Grooming Services ',
'Rental of Generators',
'Rental of Pumps',
'Rental of Compressors',
'Fuel Charges'])
'--------------------------------------------------'
('Vendor 4',
['Delivery Charges',
'Construction of Buildings',
'Onsite Civil Works',
'Hire of Cranes',
'Hire of Operated Plant',
'Installation of Flooring'])
'--------------------------------------------------'
('Vendor 5',
['Fishing Gear ',
'Supply of Office Furniture',
'Supply of Office Equipment',
'Office Supplies',
' Janitorial Services'])
'--------------------------------------------------'
('Vendor 7',
['Cloud Software ',
'Hire of Non-Operated Plant',
'Hire of Operated Plant',
'Hire of Site Welfare Facilities',
'Cranes Parts & Spares'])
'--------------------------------------------------'
('Vendor 9',
['Delivery Fee',
'Catering Services ',
'Rental of Aerial Lifts',
'Rental of Forklifts',
'Rental of Skid-Steer Loaders'])
'--------------------------------------------------'
LLMとツールを用いた異常検知
# LLMの出力形式を固定するためのツールを定義
tools = [
{
"type": "function",
"function": {
"name": "_outlier_detection",
"description": "製品リストの中で外れ値を特定します",
"parameters": {
"type": "object",
"properties": {
"outliers": {
"type": "array",
"items": {"type": "string"},
},
"reason": {
"type": "string",
"description": "アイテムが異常と識別される理由"
},
},
"required": [
"outliers",
"reason"
],
},
},
},
]
def prompt_with_outlier_tool(products: List[str]):
# 製品リストをLLMに適した文字列形式に変換
products_str = "\n".join(products)
prompt = OUTLIER_PROMPT_TEMPLATE.format(products=products_str)
return call_chat_model(prompt, tools=tools)
# いくつかの例を含むプロンプトを定義
OUTLIER_PROMPT_TEMPLATE = """あなたは架空の会社が提供する製品リストを分析していると想像してください。この会社は特定の種類の製品を専門としていますが、具体的な内容は示されていません。会社が提供する可能性のある製品の種類について合理的な仮定を使用してください。あなたのタスクは、リスト内の外れ値、つまり会社が通常提供する製品にうまく適合しない製品を特定することです。文脈と製品間の関係を考慮し、過度に狭いまたは文字通りの解釈に基づいて外れ値を特定するのは避けてください。製品が会社の焦点に合理的に関連しているか、またはその提供の自然な延長である可能性がある場合、それを外れ値と見なさないでください。すべての製品が一般的なテーマや焦点領域に合理的に適合する場合、外れ値をリストしないでください。
部品の配送に関連することが多いため、輸送、配送、宅配、貨物サービス、または料金を外れ値と見なさないでください。
出力はJSON形式である必要があります。
例:
- 製品: ["apple", "kiwi", "strawberry", "bread", "COURIER SERVICES"], 出力: {{"outliers": ["bread"], "reason": "breadは会社が専門とする果物の種類ではありません。"}}
- 製品: ["chair", "table", "sofa", "bed", "refrigerator", "tree"], 出力: {{"outliers": ["refrigerator", "tree"], "reason": "refrigeratorとtreeは典型的な家具ではなく、会社の焦点であると思われます。"}}
- 製品: ["Delivery Charges", "floor", "wall", "ceiling", "door", "window"], 出力: {{"outliers": []}}
# 製品
{products}
"""
llm_inputs.values()
dict_values([['Delivery Fee', 'Bakery Tools ', 'Hire of Scaffolding', 'Hire of Ladders & Steps', 'Hire of Site Fencing'], ['Pet Grooming Services ', 'Rental of Generators', 'Rental of Pumps', 'Rental of Compressors', 'Fuel Charges'], ['Delivery Charges', 'Construction of Buildings', 'Onsite Civil Works', 'Hire of Cranes', 'Hire of Operated Plant', 'Installation of Flooring'], ['Fishing Gear ', 'Supply of Office Furniture', 'Supply of Office Equipment', 'Office Supplies', ' Janitorial Services'], ['Cloud Software ', 'Hire of Non-Operated Plant', 'Hire of Operated Plant', 'Hire of Site Welfare Facilities', 'Cranes Parts & Spares'], ['Delivery Fee', 'Catering Services ', 'Rental of Aerial Lifts', 'Rental of Forklifts', 'Rental of Skid-Steer Loaders']])
プロンプトと関数を用いてLLMによる評価、判別を行います。
PRODUCT_INPUTS = list(llm_inputs.values())
# 新しいプロンプトを例とともに、すでに定義した関数に適用します
pd.set_option('display.max_colwidth', None)
results = call_in_parallel(prompt_with_outlier_tool, PRODUCT_INPUTS)
df_results = results_to_dataframe(PRODUCT_INPUTS, results)
MLflow Traceで挙動を確認できます。
display(df_results)
前の結果ではDelivery Fee
が異常と判別されていましたが、LLMによって関連するものであると是正されています。一方で二行目のPet Grooming Services
は外れ値として判断されています。
Products list | Model response |
---|---|
["Delivery Fee","Bakery Tools ","Hire of Scaffolding","Hire of Ladders & Steps","Hire of Site Fencing"] | {"outliers": [], "reason": "すべての製品は建設や建築に関連しているため、外れ値は見つかりませんでした。"} |
["Pet Grooming Services ","Rental of Generators","Rental of Pumps","Rental of Compressors","Fuel Charges"] | {"outliers": ["Pet Grooming Services"], "reason": "Pet Grooming Servicesは、機械や機器のレンタル、燃料料金などのサービスと比べて、会社の主な焦点から外れたサービスであるため、外れ値と見なされます。"} |
["Delivery Charges","Construction of Buildings","Onsite Civil Works","Hire of Cranes","Hire of Operated Plant","Installation of Flooring"] | {"outliers": [], "reason": "すべての製品は建設や建設関連サービスに合理的に関連しているため、外れ値は見つかりませんでした。"} |
["Fishing Gear ","Supply of Office Furniture","Supply of Office Equipment","Office Supplies"," Janitorial Services"] | { "outliers": ["Fishing Gear", "Janitorial Services"], "reason": "Fishing GearとJanitorial Servicesは、オフィス関連の製品やサービスに比べて外れ値であると考えられます。" } |
["Cloud Software ","Hire of Non-Operated Plant","Hire of Operated Plant","Hire of Site Welfare Facilities","Cranes Parts & Spares"] | {"outliers": [], "reason": "すべての製品は建設や建設機械に関連しているため、外れ値は見つかりませんでした。"} |
["Delivery Fee","Catering Services ","Rental of Aerial Lifts","Rental of Forklifts","Rental of Skid-Steer Loaders"] | { "outliers": ["Catering Services"], "reason": "Catering Servicesは建設機械のレンタルサービスと関連しない製品であるため、外れ値と見なされます。" } |