用語説明
◆UMAP (Uniform Manifold Approximation and Projection)
UMAPは、高次元データを低次元空間へ効率的かつ効果的にマッピングする次元削減手法の一つです。t-SNEと同様に非線形なデータ構造を捉えられる一方、より高速で解釈しやすい点が特徴とされています。UMAPは、学習した低次元空間からinverse_transform
機能を用いて元の高次元空間に再投影できる点でも有用で、これにより再構成誤差を算出して異常度を評価することが可能です。
はじめに
近年、自然言語処理(NLP)分野では大規模言語モデル(LLM: Large Language Model)が急速に進歩し、さまざまなタスクで優れた成果を上げています。LLMはテキストベースの入力に対し、高次元かつ豊かな意味情報を内包する埋め込み表現を得ることが可能です。
このような埋め込み空間を活用すれば、単語や文書の分布特徴を捉え、そこから「通常の分布から逸脱した異常サンプル」を検出できます。
本記事では、LLM(ここではLlamaモデルを例示)によるテキスト埋め込み取得後、UMAPで次元削減した空間をinverse_transform
で再構成し、再構成誤差(元ベクトルと再構成ベクトルの差)を異常度指標とする方法を解説します。Trainデータ(正常のみ)でUMAPを学習し、Validationデータ(正常+異常)で再構成誤差を計算して閾値を決定した後、Testデータ(正常+異常)に適用して異常検出性能を評価します。
この記事で学べること
- LLM (Llamaモデル)を用いたキーワードの埋め込み取得方法
- UMAPを用いた高次元ベクトルの次元削減と再構成
- UMAPの
inverse_transform
による再構成誤差を異常度指標とする異常検知手法 - Validationセットでの閾値決定と、TestセットでのF1スコア、ROC AUC、PR-AUC評価
- PlotlyによるUMAP空間可視化
ソースコードの解説と実装手順
以下では、ソースコード解説と実装手順を示します。環境やデータパス、モデル指定は適宜調整してください。
- 正常データ:「Kaggle LLM 20 Questions」の「物カテゴリー」のキーワード
- 異常データ:カントー地方のポケモンの英語名(筆者が好んでいるため)
- LLMはLlama-3.1-8B-Instructを使用(筆者のLLM 20 Questionsのソリューションで用いたモデルを流用)
- UMAPは非線形構造を学習できる有用な次元削減手法として採用
全体のソースコードは下記から入手可能です:
1. 必要なライブラリのインポートとパラメーター設定
import numpy as np
import pandas as pd
import torch
import umap
from transformers import AutoTokenizer, LlamaModel
from sklearn.metrics import f1_score, roc_auc_score, average_precision_score
import plotly.graph_objs as go
# パラメータ設定
NORMAL_TRAIN_SIZE = 200
NORMAL_VALID_SIZE = 15
NORMAL_TEST_SIZE = 15
NORMAL_SIZE = NORMAL_TRAIN_SIZE + NORMAL_VALID_SIZE + NORMAL_TEST_SIZE
ANOMALY_VALID_SIZE = 15
ANOMALY_TEST_SIZE = 15
RANDOM_STATE = 0
np.random.seed(RANDOM_STATE)
解説:
- NumPy、Pandas、torch、UMAP、transformers、sklearn、Plotlyなど必要なライブラリをインポート。
- データセットのサイズや乱数シード値を設定し、再現性を確保します。
2. LLMモデル(ここではLlama)とTokenizerの読み込み
tokenizer = AutoTokenizer.from_pretrained("/kaggle/input/llama-3-1-8b-instruct-fix-json/fixed-llama-3.1-8b-instruct/")
model = LlamaModel.from_pretrained("/kaggle/input/llama-3-1-8b-instruct-fix-json/fixed-llama-3.1-8b-instruct/")
解説:
- LlamaモデルとTokenizerをロードします。今回はあらかじめ用意されたモデルを読み込みます。
3. データの読み込み・サンプリング・分割
nomaly_words = pd.read_csv("/kaggle/input/things-only-name-category-kw-csv/keywords_questions_table_1.csv") \
.sample(NORMAL_SIZE, random_state=RANDOM_STATE).keyword.unique().tolist()
anomaly_words = [
"Bulbasaur", "Charmander", "Squirtle", "Caterpie", "Weedle",
"Pidgey", "Rattata", "Spearow", "Ekans", "Pikachu",
"Sandshrew", "Nidoran♀", "Nidoran♂", "Clefairy", "Vulpix",
"Jigglypuff", "Zubat", "Oddish", "Paras", "Venonat",
"Diglett", "Meowth", "Psyduck", "Mankey", "Growlithe",
"Poliwag", "Abra", "Machop", "Bellsprout", "Tentacool",
]
normal_all = np.random.choice(nomaly_words, NORMAL_TRAIN_SIZE + NORMAL_VALID_SIZE + NORMAL_TEST_SIZE, replace=False)
anomaly_all = np.random.choice(anomaly_words, ANOMALY_VALID_SIZE + ANOMALY_TEST_SIZE, replace=False)
normal_train = normal_all[:NORMAL_TRAIN_SIZE]
normal_valid = normal_all[NORMAL_TRAIN_SIZE: NORMAL_TRAIN_SIZE+NORMAL_VALID_SIZE]
normal_test = normal_all[-NORMAL_TEST_SIZE:]
anomaly_valid = anomaly_all[:ANOMALY_VALID_SIZE]
anomaly_test = anomaly_all[-ANOMALY_TEST_SIZE:]
解説:
- CSVファイルから正常データ(物カテゴリーのキーワード)を取得し、異常データとしてカントー地方のポケモンの英語名を用意しています。Train、Validation、Testセットに分割します。
4. 埋め込み取得関数と埋め込み計算
def get_embedding(text: str) -> np.ndarray:
inputs = tokenizer(text, return_tensors="pt")
with torch.no_grad():
outputs = model(**inputs)
# 平均プーリング
embedding = outputs.last_hidden_state.mean(dim=1).squeeze().numpy()
return embedding
# 埋め込み取得(train/valid/test全て)
X_normal_train = np.array([get_embedding(w) for w in normal_train])
X_normal_valid = np.array([get_embedding(w) for w in normal_valid])
X_normal_test = np.array([get_embedding(w) for w in normal_test])
X_anomaly_valid = np.array([get_embedding(w) for w in anomaly_valid])
X_anomaly_test = np.array([get_embedding(w) for w in anomaly_test])
解説:
- 単語をLLMに入力し、最後の隠れ層出力を平均化して高次元ベクトル(埋め込み)を取得します。
- 全データに対して同様の手順で埋め込みを計算し、後続の次元削減や異常検知処理に利用します。
- データ件数が膨大な場合は、埋め込みを一度ファイルに保存し、別のNotebookで解析を行うと、処理の分離や再実行時の効率化が図れます。今回はサンプル数が少ないため、保存ステップは省略します。
5. UMAPで次元削減と学習
reducer = umap.UMAP(n_neighbors=15, min_dist=0.1, n_components=2, random_state=RANDOM_STATE)
reducer.fit(X_normal_train) # 正常データでUMAPを学習
# 学習データは必要に応じて2次元空間で視覚化用にも保持
X_normal_train_2d = reducer.transform(X_normal_train)
解説:
- 正常な学習用データをUMAPに入力し、2次元の低次元空間を学習します。
- このUMAPモデルは、ValidationおよびTestデータの低次元化・再構成に用いて異常度評価を行います。
- 異常検知においては、まず2次元など低次元で可視化しやすい設定から始めると有用です。可視化を通じて異常データが分離しやすい次元数を探ることで、最適な空間表現の獲得を目指せます。
6. Validationデータで再構成誤差に基づく異常度算出と閾値決定
# Validationデータの低次元化
X_valid_all = np.vstack([X_normal_valid, X_anomaly_valid])
y_valid = np.array([0]*len(X_normal_valid) + [1]*len(X_anomaly_valid))
X_valid_2d = reducer.transform(X_valid_all)
# 再構成(inverse_transform)で元空間へ戻す
X_valid_reconstructed = reducer.inverse_transform(X_valid_2d)
# 再構成誤差(MSE)を異常度とする関数
def reconstruction_error(X_original, X_reconstructed):
return np.mean((X_original - X_reconstructed)**2, axis=1)
valid_errors = reconstruction_error(X_valid_all, X_valid_reconstructed)
# Validation上で閾値探索
best_f1 = -1
best_thresh = None
err_min, err_max = np.min(valid_errors), np.max(valid_errors)
for thresh in np.linspace(err_min, err_max, 100):
y_pred = (valid_errors > thresh).astype(int)
f1 = f1_score(y_valid, y_pred)
if f1 > best_f1:
best_f1 = f1
best_thresh = thresh
print(f"Best Threshold: {best_thresh:.4f}, Validation F1: {best_f1:.4f}")
Best Threshold: 1.1729, Validation F1: 0.9375
解説:
- Validationセットに対してUMAPで2次元化→inverse_transformで高次元空間に再構成し、元の埋め込みとのMSE誤差を計算。
- 再構成誤差が大きいほど「異常」とみなしやすく、F1スコアを最大化する閾値を決定します。
7. テストデータで評価
# Testデータの低次元化と再構成
X_test_all = np.vstack([X_normal_test, X_anomaly_test])
y_test = np.array([0]*len(X_normal_test) + [1]*len(X_anomaly_test))
X_test_2d = reducer.transform(X_test_all)
X_test_reconstructed = reducer.inverse_transform(X_test_2d)
test_errors = reconstruction_error(X_test_all, X_test_reconstructed)
y_pred_test = (test_errors > best_thresh).astype(int)
f1 = f1_score(y_test, y_pred_test)
roc_auc = roc_auc_score(y_test, test_errors)
pr_auc = average_precision_score(y_test, test_errors)
print("Test Evaluation:")
print(f"F1 Score: {f1:.4f}")
print(f"ROC AUC: {roc_auc:.4f}")
print(f"PR AUC: {pr_auc:.4f}")
Test Evaluation:
F1 Score: 0.9375
ROC AUC: 0.9689
PR AUC: 0.9632
解説:
- Testデータでも同様に再構成誤差を算出し、Validationで決定した閾値で異常判定を行います。
- その後、F1、ROC AUC、PR AUCを計算してモデル性能を評価します。
8. 可視化
def visualize_2d_data(normal_points, anomaly_points, normal_labels, anomaly_labels, title="UMAP Visualization"):
fig = go.Figure()
fig.add_trace(go.Scatter(
x=normal_points[:,0], y=normal_points[:,1],
mode='markers',
marker=dict(color='rgba(0, 255, 0, 0.8)', opacity=0.5),
name='Normal',
text=normal_labels,
hoverinfo='text'
))
fig.add_trace(go.Scatter(
x=anomaly_points[:,0], y=anomaly_points[:,1],
mode='markers',
marker=dict(color='rgba(255, 0, 0, 0.8)', opacity=0.5),
name='Anomaly',
text=anomaly_labels,
hoverinfo='text'
))
fig.update_layout(
title=title,
xaxis_title='UMAP Component 1',
yaxis_title='UMAP Component 2',
showlegend=True,
hovermode='closest'
)
fig.show()
# Validationセット可視化
normal_valid_2d = X_valid_2d[:len(X_normal_valid)]
anomaly_valid_2d = X_valid_2d[len(X_normal_valid):]
visualize_2d_data(normal_valid_2d, anomaly_valid_2d, normal_valid, anomaly_valid, title="Validation Distribution")
# Testセット可視化
normal_test_2d = X_test_2d[:len(X_normal_test)]
anomaly_test_2d = X_test_2d[len(X_normal_test):]
visualize_2d_data(normal_test_2d, anomaly_test_2d, normal_test, anomaly_test, title="Test Distribution")
解説:
- Plotlyを用いて、UMAP空間上における正常・異常サンプルの分布をプロットします。
- ValidationおよびTestセットの分布を直感的に確認することで、モデルがどのような特徴空間を「正常」と見なし、どの領域を「異常」と判断しているのかを把握できます。
- 正常・異常が明瞭に分離されていれば、低次元空間における異常検知が有効に機能していることが期待できます。もし分離が不明瞭なら、次元数の再検討や他の手法・パラメータ調整を検討する余地があります。
おわりに
本記事では、LLMを用いたテキスト埋め込みからUMAPによる次元削減、そしてinverse_transform
による再構成誤差を異常度指標とする異常検知手法を実装・解説しました。Validationセットを活用して異常検知の閾値を合理的に定められる点や、直感的な可視化が容易な点が大きな利点です。
今後の発展としては、たとえば以下が考えられます。
- UMAPのパラメーターのチューニングや他の次元削減手法(PCA、t-SNEなど)との比較
- 再構成誤差以外の指標(コサイン類似度やより洗練された再構成モデルなど)との組み合わせ
- 他のLLM(たとえば先日リリースされたllama3.3)、LLM以外の埋め込みモデル(BERT, RoBERTa, Vicuna等)の活用
- クラス不均衡の解消、大規模データセットへの拡張
- 次元削減の次元数などのパラメーター調整
- UMAPの学習時に、異常データの一部も学習させる方法の検討(参考記事:https://blog.atusy.net/2020/09/02/umap-outlier/)
参考にしたもの