0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

MCPの活用や応用への考察 - MCP監査ログを活用した機械学習による不正検知システムの構築

Posted at

はじめに

Model Context Protocol (MCP)は、LLMアプリケーションと外部システムを接続する標準化されたプロトコルです。本記事では、MCP監査ログを活用し、機械学習による不正利用パターンの検知システムを構築する方法を解説します。

この手法により、従来の静的なルールベース検知では困難だった、未知の脅威や巧妙な攻撃パターンを検出できるようになります。

💡 MCP監査ログの特性と価値

MCPログに記録される情報

MCP監査ログには以下のような情報が記録されます:

  • Tool実行履歴: どのToolが、誰によって、いつ実行されたか
  • リソースアクセス: どのファイルやデータにアクセスしたか
  • 認証・認可イベント: 成功/失敗した認証試行
  • プロンプトとコンテキスト: LLMに渡された入力情報
  • 実行結果: Tool実行の成否と出力

これらのログは、ユーザー行動の全体像を把握するための重要なデータソースとなります。

🔍 特徴量エンジニアリング

機械学習モデルの性能は、適切な特徴量設計に大きく依存します。以下、MCPログから抽出すべき特徴量を紹介します。

1. Tool利用パターンの特徴量

特徴量 計算方法 検知できる脅威
Tool実行頻度の偏差 時間窓内の実行回数と過去平均の差分 データ流出の大量試行
危険なTool使用率 delete_file, execute_code等の割合 破壊的な操作の集中
Tool呼び出しシーケンス N-gram分析による異常な操作順序 攻撃スクリプトによる自動化
失敗率の急変 連続成功/失敗の統計的変化 ブルートフォース攻撃

実装例(Python):

import pandas as pd
import numpy as np
from collections import Counter

def extract_tool_features(logs_df, user_id, window_hours=24):
    """Tool利用パターンの特徴量を抽出"""
    user_logs = logs_df[logs_df['user_id'] == user_id]
    recent_logs = user_logs[user_logs['timestamp'] > 
                            pd.Timestamp.now() - pd.Timedelta(hours=window_hours)]
    
    # 危険なToolのリスト
    risky_tools = ['delete_file', 'execute_code', 'download_data']
    
    features = {
        'tool_count': len(recent_logs),
        'unique_tools': recent_logs['tool_name'].nunique(),
        'risky_tool_ratio': (recent_logs['tool_name'].isin(risky_tools)).mean(),
        'failure_rate': (recent_logs['status'] == 'failed').mean(),
        'avg_execution_time': recent_logs['execution_time'].mean()
    }
    
    # Tool使用頻度の偏差(Z-score)
    hist_mean = user_logs.groupby(pd.Grouper(key='timestamp', freq='D')).size().mean()
    hist_std = user_logs.groupby(pd.Grouper(key='timestamp', freq='D')).size().std()
    current_count = len(recent_logs)
    features['frequency_zscore'] = (current_count - hist_mean) / (hist_std + 1e-5)
    
    return features

2. コンテキストアクセスの特徴量

特徴量 説明 異常の指標
機密度スコアの平均 アクセスしたリソースの機密レベル 通常より高機密データへのアクセス増加
アクセス範囲の多様性 Shannon Entropy等で計算 通常と異なる広範なデータアクセス
初回アクセスリソース率 過去にアクセスしたことがないリソースの割合 探索的な不正アクセス

3. ユーザー行動特性の特徴量

特徴量 計算方法 検知対象
アクセス元の地理的偏差 IPアドレスの通常位置からの距離 認証情報の窃取・不正利用
アクセス時刻の異常度 ユーザーの通常活動時間との乖離 深夜の不審なアクセス
セッション時間の異常 通常のセッション長との比較 自動化されたボット攻撃

🧠 機械学習モデルの設計

なぜ異常検知(Anomaly Detection)なのか

不正利用検知には以下の特性があります:

  1. 不正事例が極端に少ない(全体の0.1%未満)
  2. 攻撃手法が常に進化する(未知のパターンへの対応必須)
  3. 正常な行動は定義しやすい

これらの理由から、教師あり学習よりも異常検知アルゴリズムが適しています。

推奨モデルの比較

1. Isolation Forest

特徴:

  • ランダムに分割して「孤立しやすいサンプル」を異常と判定
  • 高速で、高次元データに強い
  • 解釈性が比較的高い

適用シーン:

  • 多様な特徴量を同時に扱う総合的な異常検知
  • リアルタイム処理が必要な場合

実装例:

from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
import joblib

class MCPAnomalyDetector:
    def __init__(self, contamination=0.01):
        """
        contamination: 異常データの想定割合(通常は0.01-0.05)
        """
        self.scaler = StandardScaler()
        self.model = IsolationForest(
            contamination=contamination,
            random_state=42,
            n_estimators=100
        )
    
    def train(self, normal_logs_df):
        """正常なログデータで学習"""
        features = self._extract_features(normal_logs_df)
        X = self.scaler.fit_transform(features)
        self.model.fit(X)
    
    def predict_anomaly_score(self, log_entry):
        """異常スコアを計算(0-1の範囲、1が最も異常)"""
        features = self._extract_features(log_entry)
        X = self.scaler.transform(features)
        
        # Isolation Forestのスコアを0-1に正規化
        score = self.model.score_samples(X)[0]
        normalized_score = 1 / (1 + np.exp(score))  # Sigmoid変換
        
        return normalized_score
    
    def _extract_features(self, logs):
        """ログから特徴量を抽出(前述の関数を使用)"""
        # 実装は省略
        pass

2. One-Class SVM

特徴:

  • 正常データの境界を学習
  • ノイズに比較的頑健
  • カーネル関数で非線形パターンに対応

適用シーン:

  • 特定ユーザー専用のプロファイル構築
  • 特徴量が比較的少ない(<20次元)場合

3. Autoencoder

特徴:

  • ニューラルネットワークでデータを圧縮・復元
  • 復元誤差が大きいものを異常と判定
  • 複雑な時系列パターンに対応可能

適用シーン:

  • Tool呼び出しの時系列シーケンス分析
  • 大量のログデータが利用可能な場合

実装例(LSTM Autoencoder):

import tensorflow as tf
from tensorflow import keras

def build_lstm_autoencoder(sequence_length, n_features):
    """時系列異常検知用のLSTM Autoencoder"""
    
    # エンコーダー
    encoder_inputs = keras.Input(shape=(sequence_length, n_features))
    encoder_lstm = keras.layers.LSTM(64, activation='relu', return_sequences=True)(encoder_inputs)
    encoder_lstm = keras.layers.LSTM(32, activation='relu')(encoder_lstm)
    
    # デコーダー
    decoder_lstm = keras.layers.RepeatVector(sequence_length)(encoder_lstm)
    decoder_lstm = keras.layers.LSTM(32, activation='relu', return_sequences=True)(decoder_lstm)
    decoder_lstm = keras.layers.LSTM(64, activation='relu', return_sequences=True)(decoder_lstm)
    decoder_outputs = keras.layers.TimeDistributed(keras.layers.Dense(n_features))(decoder_lstm)
    
    # モデルの構築
    autoencoder = keras.Model(encoder_inputs, decoder_outputs)
    autoencoder.compile(optimizer='adam', loss='mse')
    
    return autoencoder

# 異常スコアの計算
def calculate_reconstruction_error(model, sequence):
    """復元誤差を異常スコアとして計算"""
    reconstruction = model.predict(sequence)
    mse = np.mean(np.square(sequence - reconstruction), axis=(1, 2))
    return mse

時系列分析の統合

異常スコアは単一時点だけでなく、時系列トレンドで評価すべきです。

EWMA(指数加重移動平均)の活用:

def calculate_ewma_anomaly(scores, alpha=0.3):
    """
    EWMAで異常スコアのトレンドを追跡
    
    alpha: 重み係数(0-1、大きいほど最近のデータを重視)
    """
    ewma = [scores[0]]
    for score in scores[1:]:
        ewma.append(alpha * score + (1 - alpha) * ewma[-1])
    
    # EWMAからの乖離が大きいものを検知
    deviations = [abs(s - e) for s, e in zip(scores, ewma)]
    
    return ewma, deviations

⚙️ 運用とセキュリティレスポンス

1. リアルタイム検知パイプライン

MCPログ生成 → 特徴量抽出 → 異常スコア算出 → しきい値判定 → 自動レスポンス
     ↓                                                      ↓
 ログストア                                          アラート送信

アーキテクチャ例:

import asyncio
from datetime import datetime

class RealTimeDetectionPipeline:
    def __init__(self, detector, alert_system):
        self.detector = detector
        self.alert_system = alert_system
        self.score_history = {}
    
    async def process_log_entry(self, log_entry):
        """ログエントリをリアルタイム処理"""
        user_id = log_entry['user_id']
        
        # 1. 異常スコア計算
        anomaly_score = self.detector.predict_anomaly_score(log_entry)
        
        # 2. 履歴に記録
        if user_id not in self.score_history:
            self.score_history[user_id] = []
        self.score_history[user_id].append({
            'timestamp': datetime.now(),
            'score': anomaly_score,
            'log': log_entry
        })
        
        # 3. レスポンスレベルの判定
        response = self._determine_response(anomaly_score, user_id)
        
        # 4. アクション実行
        await self._execute_response(response, user_id, log_entry)
        
        return anomaly_score
    
    def _determine_response(self, score, user_id):
        """スコアに基づいてレスポンスレベルを決定"""
        if score > 0.9:
            return 'CRITICAL'
        elif score > 0.8:
            return 'WARNING'
        elif score > 0.7:
            return 'MONITORING'
        else:
            return 'NORMAL'
    
    async def _execute_response(self, level, user_id, log_entry):
        """レスポンスレベルに応じたアクションを実行"""
        if level == 'CRITICAL':
            # セッション強制終了
            await self._terminate_session(user_id)
            await self.alert_system.send_critical_alert(user_id, log_entry)
        
        elif level == 'WARNING':
            # レートリミット適用
            await self._apply_rate_limit(user_id)
            await self.alert_system.send_warning(user_id, log_entry)
        
        elif level == 'MONITORING':
            # 詳細ログ記録を有効化
            await self._enable_verbose_logging(user_id)

2. 介入レベルの設定

異常スコア レベル 自動レスポンス 人間の介入
0.7 < S ≤ 0.8 注意 詳細ログ記録の有効化 通知のみ(低優先度)
0.8 < S ≤ 0.9 警告 レートリミット適用
・Tool実行に遅延追加
・リクエスト数制限
SecOpsへアラート
ログ精査を要請
S > 0.9 緊急 セッション強制終了
アカウント一時停止
全ての操作をブロック
緊急インシデント対応
即座に調査開始

しきい値の調整指針:

from sklearn.metrics import precision_recall_curve

def optimize_threshold(y_true, anomaly_scores):
    """精度-再現率曲線から最適なしきい値を決定"""
    precision, recall, thresholds = precision_recall_curve(y_true, anomaly_scores)
    
    # F1スコアが最大となるしきい値を選択
    f1_scores = 2 * (precision * recall) / (precision + recall + 1e-5)
    optimal_idx = np.argmax(f1_scores)
    
    return thresholds[optimal_idx], f1_scores[optimal_idx]

3. フィードバックループの構築

検知システムは継続的に改善する必要があります。

ステップ1: ラベリングと検証

class FeedbackLoop:
    def __init__(self, detector):
        self.detector = detector
        self.labeled_incidents = []
    
    def add_feedback(self, log_entry, is_malicious, severity, notes):
        """セキュリティチームからのフィードバックを記録"""
        self.labeled_incidents.append({
            'log': log_entry,
            'label': 1 if is_malicious else 0,
            'severity': severity,
            'notes': notes,
            'timestamp': datetime.now()
        })
    
    def retrain_model(self, min_samples=100):
        """十分なフィードバックが集まったらモデルを再訓練"""
        if len(self.labeled_incidents) < min_samples:
            return False
        
        # フィードバックデータを訓練セットに統合
        X, y = self._prepare_training_data()
        
        # 半教師あり学習でモデル更新
        self.detector.partial_fit(X, y)
        
        return True

ステップ2: ポリシー自動生成

def generate_policy_recommendation(incident_logs):
    """不正パターンからABACポリシーを自動生成"""
    common_patterns = analyze_common_patterns(incident_logs)
    
    policies = []
    for pattern in common_patterns:
        if pattern['type'] == 'time_based':
            policies.append({
                'condition': f"access_time NOT IN {pattern['normal_hours']}",
                'action': 'DENY',
                'resource': pattern['resources'],
                'confidence': pattern['frequency']
            })
        
        elif pattern['type'] == 'tool_sequence':
            policies.append({
                'condition': f"tool_sequence MATCHES {pattern['sequence']}",
                'action': 'REQUIRE_MFA',
                'confidence': pattern['frequency']
            })
    
    return policies

📊 評価指標とモニタリング

モデル性能の評価

異常検知では、一般的な分類指標に加えて以下を重視します:

  • 精度(Precision): アラートの信頼性(誤検知率の低さ)
  • 再現率(Recall): 実際の脅威の捕捉率
  • 検知までの時間: 攻撃開始から検知までの平均時間
def calculate_detection_metrics(y_true, y_pred, detection_times):
    """検知システムの性能指標を計算"""
    from sklearn.metrics import precision_score, recall_score, f1_score
    
    metrics = {
        'precision': precision_score(y_true, y_pred),
        'recall': recall_score(y_true, y_pred),
        'f1': f1_score(y_true, y_pred),
        'avg_detection_time': np.mean(detection_times),
        'false_positive_rate': np.sum((y_pred == 1) & (y_true == 0)) / np.sum(y_true == 0)
    }
    
    return metrics

運用ダッシュボード

Grafana等で以下を可視化します:

  • リアルタイム異常スコアの推移
  • アラート発生頻度とレベル分布
  • モデル性能指標のトレンド
  • 誤検知/検知漏れの統計

実装の段階的アプローチ

Phase 1: 基礎構築(1-2ヶ月)

  1. ログ収集基盤の整備
  2. 基本的な特徴量抽出の実装
  3. Isolation Forestによる単純な異常検知

Phase 2: 高度化(2-3ヶ月)

  1. 時系列分析の統合
  2. ユーザー別プロファイルの構築
  3. 自動レスポンス機能の実装

Phase 3: 自律化(継続的)

  1. フィードバックループの運用
  2. ポリシー自動生成機能
  3. モデルの継続的改善

まとめ

MCP監査ログを活用した機械学習ベースの不正検知システムは、以下の利点を提供します:

  • 未知の脅威への対応: 既知のパターンに依存しない検知
  • 継続的な改善: フィードバックループによる学習
  • 自動化されたレスポンス: 人間の介入前の初動対応

一方で、以下の課題にも注意が必要です:

  • 誤検知の管理: 過度なアラートによる警告疲れ
  • モデルの解釈性: 検知理由の説明責任
  • 計算リソース: リアルタイム処理のコスト

これらのバランスを取りながら、段階的に実装を進めることで、堅牢なセキュリティシステムを構築できます。

参考資料


注意: MCPはAnthropicが開発した比較的新しいプロトコルです。最新の情報については、公式ドキュメントを参照してください。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?