はじめに
異常検知モデルのIsolation Forestを触ってみたので、Isolation Forestの大まかな仕組みとPythonで分析した流れを説明します。
異常検知とは
異常検知は、データの中から正常なパターンと異なる異常なデータを識別する技術です。検知したい異常データ数が少なく、正常データ数が多い場合などに用いられます。
異常検知には様々なモデルがありますが、今回は比較的計算量が軽いとされるIsolation Forestを使用してみます。
Isolation Treeとは
Isolation Treeとは、ランダムに特徴量と分割値を選んで、データが孤立する(1つになる)まで分割を繰り返す木のことです。
以下のいずれかの条件を満たすまで分割を繰り返すことで構築されます。
- 木の高さ制限に達した場合
- ノード内のデータが1件になった(孤立した)場合
- ノード内のデータがすべて同じ値でこれ以上分割できない状態になった場合
Isolation Forestとは
Isolation Forestは異常検知アルゴリズムの一種で、基本構造にはIsolation Treeが使われています。1本の木では結果に偏りが出るため、複数の木を生成して学習し異常スコアの計算を行います。
異常スコアの計算には異常データが持つ 「数は少なく、正常なデータとは値が大きく異なる」 という特性を利用します。
この特性がある場合、異常データは他のデータから離れているため、少ない回数の分割で孤立します。一方で、正常データは密集しているため、孤立させるには多くの分割を必要とします。
分割のイメージ
上記のように、異常なデータは正常データよりも少ない分割で孤立すると考えて、学習を行います。
※上記はイメージ図であり、実際の学習データとは異なります
異常検知を実施する
実際にIsolation Forestを使って異常検知を行います。
使用データ
scikit-learnに含まれる乳がんデータセット(Breast Cancer Wisconsin Dataset)を使用します。このデータセットを使って、Isolation Forestが悪性腫瘍を異常として検出できるかを検証します。
データは以下のようになっています:
- サンプル数: 569件
- 悪性(異常): 212件
- 良性(正常): 357件
- 特徴量: 30個
- クラス: 良性と悪性の2クラス
実行環境
以下の環境で実行しました:
- Python 3.12
- scikit-learn 1.8.0
分析の流れ
データセット分割
データは学習データを8割、テストデータを2割に分割して使用しました。なお、学習データは分割した8割の中からさらに正常データのみに絞って使用しています。
- 学習データ: 285件 (正常データのみ)
- テストデータ: 114件
- 悪性(異常): 42件
- 良性(正常): 72件
from sklearn.datasets import load_breast_cancer
from sklearn.model_selection import train_test_split
data = load_breast_cancer()
X, y = data.data, data.target
# データを学習用とテスト用に分割
X_train_all, X_test, y_train_all, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
# 学習データからは正常データ(良性)のみを使用
X_train = X_train_all[y_train_all == 1]
Isolation Forestモデルの構築と学習
モデルは以下のパラメータで構築しました。
from sklearn.ensemble import IsolationForest
model = IsolationForest(
n_estimators=100,
contamination='auto',
random_state=42
)
# 正常データ(良性)のみで学習
model.fit(X_train)
テストデータで異常検知
以下のようなコードでテストデータに対する予測を行いました。
# テストデータで予測
predictions = model.predict(X_test) # -1: 異常, 1: 正常
scores = model.score_samples(X_test) # 異常スコア(負の値、小さいほど異常)
分析結果
テストデータ114件に対する予測結果は以下になります:
- 異常と判定: 78件
- 正常と判定: 36件
混同行列
| 予測: 悪性 | 予測: 良性 | |
|---|---|---|
| 実際: 悪性 | 42 | 0 |
| 実際: 良性 | 36 | 36 |
評価指標
| 指標 | 値 | 説明 |
|---|---|---|
| Accuracy | 0.684 | 全体の正解率 |
| Precision (悪性) | 0.54 | 悪性と予測したうち実際に悪性だった割合 |
| Recall (悪性) | 1.00 | 実際の悪性のうち正しく検出できた割合 |
| F1-score (悪性) | 0.70 | PrecisionとRecallの調和平均 |
| Precision (良性) | 1.00 | 良性と予測したうち実際に良性だった割合 |
| Recall (良性) | 0.50 | 実際の良性のうち正しく検出できた割合 |
まとめ
今回はIsolation Forestを使って異常検知を行ってみました。特に前処理を行っていないデータかつ正常データのみの学習でも、異常を見逃すことなくすべて検出することができました(Recall (悪性)が100%)。しかし、良性なのに悪性と判定されたケースが多い(偽陽性が高い)結果となっており、パラメータの調整やデータの前処理の重要性も感じました。
参考サイト
補足
異常スコアの定義
異常スコアは以下のように定義されます。
$$
s(x, n) = 2^{-\frac{E(h(x))}{c(n)}}
$$
-
h(x): データxが孤立するまでのパスの長さ(枝の数) -
E(h(x)): 全決定木でのパスの長さ(枝の数)の平均 -
c(n): サブサンプルサイズnに基づく正規化定数
c(n)はランダムに木を作ったら、平均的にこれくらいの深さでデータは孤立するはずだということを表すようです。そのため、上記の異常スコアの $\frac{E(h(x))}{c(n)}$ は値が小さいほど、平均的な木よりも深さが浅いことを示します。それに、符号を反転させて指数として表すことで、0~1の間の値を取るようにしています。($\frac{E(h(x))}{c(n)}$ が大きければ0に近づき、小さければ1に近づく)
なお、scikit-learnのIsolation Forestが返す値は、原論文の定義とは符号が反転しています。原論文では大きいほど異常ですが、scikit-learnでは小さいほど異常となっています。
c(n)の中身
$c(n)$は以下のように定義されるようです。
こちらに関する詳細な説明が難しいため、本記事では省略します。
$$
c(n) = 2H(n-1) - \dfrac{2(n-1)}{n}
$$
- $H(i) \approx \ln(i) + \gamma$: 調和数の近似
- $\gamma$: オイラー・マスケローニ定数(≈ 0.5772)
使用したコード
使用したコード
"""
Isolation Forestを使った乳がんデータセットの異常検知テスト
悪性腫瘍を「異常」として検出できるかを検証
"""
import numpy as np
import pandas as pd
from sklearn.datasets import load_breast_cancer
from sklearn.ensemble import IsolationForest
from sklearn.model_selection import train_test_split
from sklearn.metrics import (
classification_report,
confusion_matrix,
accuracy_score
)
import matplotlib.pyplot as plt
import seaborn as sns
def load_and_prepare_data():
"""
乳がんデータセットの読み込みと準備
学習データとテストデータに分割
Returns:
X_train: 学習用特徴量データ (正常データのみ)
X_test: テスト用特徴量データ
y_test: テスト用ラベルデータ (0: 悪性(異常), 1: 良性(正常))
feature_names: 特徴量名
"""
print("=" * 60)
print("1. データの読み込みと分割")
print("=" * 60)
data = load_breast_cancer()
X = data.data
y = data.target # 0: malignant(悪性), 1: benign(良性)
print(f"全データ数: {len(X)}")
print(f"特徴量数: {X.shape[1]}")
print(f"悪性(異常): {np.sum(y == 0)}件 ({np.sum(y == 0) / len(y) * 100:.2f}%)")
print(f"良性(正常): {np.sum(y == 1)}件 ({np.sum(y == 1) / len(y) * 100:.2f}%)")
print()
# データを学習用とテスト用に分割 (80%:20%)
X_train_all, X_test, y_train_all, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
# 学習データからは正常データ(良性)のみを使用
X_train = X_train_all[y_train_all == 1] # 良性のみ
print("【データ分割結果】")
print(f"学習データ: {len(X_train)}件 (正常データのみ)")
print(f"テストデータ: {len(X_test)}件")
print(f" - テスト: 悪性(異常) {np.sum(y_test == 0)}件")
print(f" - テスト: 良性(正常) {np.sum(y_test == 1)}件")
print()
return X_train, X_test, y_test, data.feature_names
def train_isolation_forest(X_train, X_test, contamination='auto'):
"""
Isolation Forestモデルの学習と予測
Args:
X_train: 学習用特徴量データ (正常データのみ)
X_test: テスト用特徴量データ
contamination: 異常データの割合の目安 (デフォルト: 'auto')
実際のユースケースでは業務知識から設定する。
テストデータのラベルから計算するとデータ漏洩になるため注意。
Returns:
model: 学習済みモデル
predictions: 予測結果 (-1: 異常, 1: 正常)
scores: 異常スコア(負の値、小さいほど異常)
"""
print("=" * 60)
print("2. Isolation Forestモデルの学習")
print("=" * 60)
print(f"contamination(異常割合): {contamination}")
# モデルの構築
model = IsolationForest(
n_estimators=100,
max_samples='auto',
contamination=contamination,
random_state=42,
verbose=0
)
# 学習 (正常データのみで学習)
print("正常データのみでモデルを学習中...")
model.fit(X_train)
# テストデータで予測
print("テストデータで予測中...")
predictions = model.predict(X_test) # -1: 異常, 1: 正常
scores = model.score_samples(X_test) # 異常スコア(小さいほど異常)
print(f"\n【テストデータの予測結果】")
print(f"異常と判定: {np.sum(predictions == -1)}件")
print(f"正常と判定: {np.sum(predictions == 1)}件")
print()
return model, predictions, scores
def evaluate_model(y_true, y_pred, scores):
"""
モデルの評価
Args:
y_true: 真のラベル (0: 悪性, 1: 良性)
y_pred: 予測ラベル (-1: 異常, 1: 正常)
scores: 異常スコア
"""
print("=" * 60)
print("3. モデルの評価")
print("=" * 60)
# 予測を0/1に変換 (-1 -> 0(異常), 1 -> 1(正常))
y_pred_binary = np.where(y_pred == -1, 0, 1)
# 混同行列
cm = confusion_matrix(y_true, y_pred_binary)
print("混同行列:")
print(" 予測: 悪性 予測: 良性")
print(f"実際: 悪性 {cm[0][0]:4d} {cm[0][1]:4d}")
print(f"実際: 良性 {cm[1][0]:4d} {cm[1][1]:4d}")
print()
# 精度指標
accuracy = accuracy_score(y_true, y_pred_binary)
print(f"Accuracy(精度): {accuracy:.4f}")
print()
# 詳細レポート
print("Classification Report:")
target_names = ['悪性(異常)', '良性(正常)']
print(classification_report(y_true, y_pred_binary, target_names=target_names))
return y_pred_binary, cm
def visualize_results(y_true, scores, cm):
"""
結果の可視化
Args:
y_true: 真のラベル
scores: 異常スコア
cm: 混同行列
"""
print("=" * 60)
print("4. 結果の可視化")
print("=" * 60)
fig, axes = plt.subplots(1, 2, figsize=(14, 6))
# 1. 混同行列のヒートマップ
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', ax=axes[0])
axes[0].set_title('混同行列', fontsize=14, pad=10)
axes[0].set_xlabel('予測ラベル', fontsize=12)
axes[0].set_ylabel('真のラベル', fontsize=12)
axes[0].set_xticklabels(['悪性', '良性'])
axes[0].set_yticklabels(['悪性', '良性'])
# 2. 異常スコアの分布
malignant_scores = -scores[y_true == 0] # 悪性(異常)
benign_scores = -scores[y_true == 1] # 良性(正常)
axes[1].hist(malignant_scores, bins=50, alpha=0.6, label='悪性(異常)', color='red')
axes[1].hist(benign_scores, bins=50, alpha=0.6, label='良性(正常)', color='blue')
axes[1].set_title('異常スコアの分布', fontsize=14, pad=10)
axes[1].set_xlabel('異常スコア(高いほど異常)', fontsize=12)
axes[1].set_ylabel('頻度', fontsize=12)
axes[1].legend()
axes[1].grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig('./results.png', dpi=300, bbox_inches='tight')
print("結果を 'results.png' に保存しました")
plt.show()
def analyze_feature_importance(model, X_test, y_test, feature_names):
"""
特徴量の重要度を分析(簡易版)
Args:
model: 学習済みモデル
X_test: テストデータの特徴量
y_test: テストデータのラベル
feature_names: 特徴量名
"""
print("=" * 60)
print("5. 特徴量の分析 (テストデータ)")
print("=" * 60)
# 悪性と良性の平均値を比較
malignant_mean = X_test[y_test == 0].mean(axis=0)
benign_mean = X_test[y_test == 1].mean(axis=0)
diff = np.abs(malignant_mean - benign_mean)
# 差が大きい順にソート
top_indices = np.argsort(diff)[::-1][:10]
print("悪性と良性で差が大きい特徴量 Top 10:")
for i, idx in enumerate(top_indices, 1):
print(f"{i:2d}. {feature_names[idx]:30s} "
f"(悪性平均: {malignant_mean[idx]:8.2f}, "
f"良性平均: {benign_mean[idx]:8.2f}, "
f"差: {diff[idx]:8.2f})")
print()
def main():
"""
メイン処理
"""
print("\n" + "=" * 60)
print("Isolation Forestによる乳がん異常検知テスト")
print("正常データ(良性)のみで学習し、テストデータで異常検知")
print("=" * 60 + "\n")
# 1. データの読み込みと分割
X_train, X_test, y_test, feature_names = load_and_prepare_data()
# 2. モデルの学習と予測
model, predictions, scores = train_isolation_forest(X_train, X_test)
# 3. モデルの評価
y_pred_binary, cm = evaluate_model(y_test, predictions, scores)
# 4. 結果の可視化
visualize_results(y_test, scores, cm)
# 5. 特徴量の分析
analyze_feature_importance(model, X_test, y_test, feature_names)
print("=" * 60)
print("テスト完了!")
print("=" * 60)
if __name__ == "__main__":
main()