0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

MCPを活用したコンテンツアクセス履歴の管理とセキュリティ強化

Posted at

はじめに

Model Context Protocol (MCP)は、AIアプリケーションとデータソース間の標準化されたインターフェースを提供します。本記事では、MCPの実際の機能を活用して、コンテンツアクセスの透明性を向上させ、セキュリティを強化する実践的な方法を解説します。

MCPにおけるアクセス制御とログ管理

MCPの実際の機能

MCPは以下の標準化されたインターフェースを提供します:

  • Resources: ファイル、データベース、APIエンドポイントへのアクセス
  • Tools: AIが実行可能な機能
  • Prompts: 再利用可能なプロンプトテンプレート

これらのやり取りは全て標準化された形式で記録でき、透明性の高いアクセス管理が可能です。

セキュアなMCPサーバーの実装

import json
import hashlib
import datetime
from typing import Dict, List, Optional
from pathlib import Path
import logging

class SecureMCPServer:
    def __init__(self, config_path: str):
        self.config = self.load_config(config_path)
        self.access_logs: List[Dict] = []
        self.setup_logging()
        
    def setup_logging(self):
        """ログ設定の初期化"""
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler('mcp_access.log'),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger(__name__)
    
    def authenticate_client(self, client_id: str, api_key: str) -> bool:
        """クライアント認証"""
        expected_hash = self.config.get('clients', {}).get(client_id)
        if not expected_hash:
            return False
        
        key_hash = hashlib.sha256(api_key.encode()).hexdigest()
        return key_hash == expected_hash
    
    def authorize_resource_access(self, client_id: str, resource_uri: str) -> bool:
        """リソースアクセス権限の確認"""
        client_permissions = self.config.get('permissions', {}).get(client_id, [])
        
        # パターンマッチングでリソースアクセス権限を確認
        for pattern in client_permissions:
            if self.match_resource_pattern(resource_uri, pattern):
                return True
        return False
    
    def match_resource_pattern(self, resource_uri: str, pattern: str) -> bool:
        """リソースパターンマッチング"""
        # 簡単なワイルドカード実装
        import fnmatch
        return fnmatch.fnmatch(resource_uri, pattern)
    
    def log_access(self, client_id: str, resource_uri: str, 
                   action: str, success: bool, context: Optional[str] = None):
        """アクセスログの記録"""
        log_entry = {
            'timestamp': datetime.datetime.now().isoformat(),
            'client_id': client_id,
            'resource_uri': resource_uri,
            'action': action,
            'success': success,
            'context': context,
            'session_id': getattr(self, 'current_session_id', None)
        }
        
        self.access_logs.append(log_entry)
        self.logger.info(f"Access: {client_id} -> {resource_uri} ({action}) - {'SUCCESS' if success else 'DENIED'}")
        
        # 永続化
        self.persist_log(log_entry)
    
    def persist_log(self, log_entry: Dict):
        """ログの永続化"""
        log_file = Path('access_logs.jsonl')
        with log_file.open('a') as f:
            f.write(json.dumps(log_entry) + '\n')
    
    def get_resource(self, client_id: str, resource_uri: str, 
                     context: Optional[str] = None) -> Optional[Dict]:
        """セキュアなリソース取得"""
        
        # 認証・認可チェック
        if not self.authorize_resource_access(client_id, resource_uri):
            self.log_access(client_id, resource_uri, 'GET', False, context)
            return None
        
        try:
            # リソース取得ロジック
            resource_content = self.load_resource(resource_uri)
            self.log_access(client_id, resource_uri, 'GET', True, context)
            
            return {
                'uri': resource_uri,
                'content': resource_content,
                'checksum': self.calculate_checksum(resource_content),
                'accessed_at': datetime.datetime.now().isoformat()
            }
        except Exception as e:
            self.log_access(client_id, resource_uri, 'GET', False, f"Error: {str(e)}")
            return None
    
    def calculate_checksum(self, content: str) -> str:
        """コンテンツのチェックサム計算"""
        return hashlib.sha256(content.encode()).hexdigest()
    
    def load_resource(self, resource_uri: str) -> str:
        """リソースの実際の読み込み"""
        # 実装例:ファイルシステムからの読み込み
        if resource_uri.startswith('file://'):
            file_path = resource_uri[7:]  # 'file://' を除去
            with open(file_path, 'r', encoding='utf-8') as f:
                return f.read()
        else:
            raise ValueError(f"Unsupported resource URI: {resource_uri}")

# 使用例
config = {
    'clients': {
        'ai_assistant_1': hashlib.sha256('secret_key_123'.encode()).hexdigest()
    },
    'permissions': {
        'ai_assistant_1': [
            'file://content/blog/*.md',
            'file://content/docs/*.txt'
        ]
    }
}

server = SecureMCPServer('config.json')

アクセス履歴の分析とモニタリング

セキュリティイベントの検知

class SecurityMonitor:
    def __init__(self, log_file_path: str):
        self.log_file = Path(log_file_path)
        self.suspicious_patterns = [
            {'type': 'excessive_access', 'threshold': 100, 'window': 3600},  # 1時間に100回以上
            {'type': 'failed_attempts', 'threshold': 10, 'window': 300},      # 5分に10回以上の失敗
            {'type': 'unusual_hours', 'start': 0, 'end': 6}                  # 深夜時間帯
        ]
    
    def load_logs(self, hours_back: int = 24) -> List[Dict]:
        """指定した時間範囲のログを読み込み"""
        cutoff_time = datetime.datetime.now() - datetime.timedelta(hours=hours_back)
        
        logs = []
        if self.log_file.exists():
            with self.log_file.open('r') as f:
                for line in f:
                    try:
                        log_entry = json.loads(line.strip())
                        log_time = datetime.datetime.fromisoformat(log_entry['timestamp'])
                        if log_time >= cutoff_time:
                            logs.append(log_entry)
                    except (json.JSONDecodeError, KeyError):
                        continue
        
        return logs
    
    def detect_suspicious_activity(self) -> List[Dict]:
        """疑わしいアクティビティの検出"""
        logs = self.load_logs()
        alerts = []
        
        # クライアント別の活動を分析
        client_activity = {}
        for log in logs:
            client_id = log['client_id']
            if client_id not in client_activity:
                client_activity[client_id] = []
            client_activity[client_id].append(log)
        
        for client_id, activities in client_activity.items():
            alerts.extend(self.analyze_client_activity(client_id, activities))
        
        return alerts
    
    def analyze_client_activity(self, client_id: str, activities: List[Dict]) -> List[Dict]:
        """クライアント単位での活動分析"""
        alerts = []
        
        # 過度なアクセス検知
        recent_accesses = [a for a in activities 
                          if (datetime.datetime.now() - 
                              datetime.datetime.fromisoformat(a['timestamp'])).seconds < 3600]
        
        if len(recent_accesses) > 100:
            alerts.append({
                'type': 'excessive_access',
                'client_id': client_id,
                'count': len(recent_accesses),
                'severity': 'HIGH'
            })
        
        # 連続した失敗アクセス
        failed_attempts = [a for a in activities if not a['success']]
        recent_failures = [a for a in failed_attempts
                          if (datetime.datetime.now() - 
                              datetime.datetime.fromisoformat(a['timestamp'])).seconds < 300]
        
        if len(recent_failures) > 10:
            alerts.append({
                'type': 'multiple_failures',
                'client_id': client_id,
                'count': len(recent_failures),
                'severity': 'MEDIUM'
            })
        
        return alerts

# モニタリングの実行
monitor = SecurityMonitor('access_logs.jsonl')
alerts = monitor.detect_suspicious_activity()

for alert in alerts:
    print(f"[{alert['severity']}] {alert['type']}: Client {alert['client_id']} - Count: {alert['count']}")

アクセスパターンの可視化

import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime, timedelta

class AccessAnalyzer:
    def __init__(self, log_file_path: str):
        self.log_file = Path(log_file_path)
    
    def load_logs_as_dataframe(self, days_back: int = 7) -> pd.DataFrame:
        """ログをDataFrameとして読み込み"""
        cutoff_time = datetime.now() - timedelta(days=days_back)
        
        logs = []
        if self.log_file.exists():
            with self.log_file.open('r') as f:
                for line in f:
                    try:
                        log_entry = json.loads(line.strip())
                        log_time = datetime.fromisoformat(log_entry['timestamp'])
                        if log_time >= cutoff_time:
                            logs.append(log_entry)
                    except (json.JSONDecodeError, KeyError):
                        continue
        
        df = pd.DataFrame(logs)
        if not df.empty:
            df['timestamp'] = pd.to_datetime(df['timestamp'])
            df['hour'] = df['timestamp'].dt.hour
            df['day_of_week'] = df['timestamp'].dt.day_name()
        
        return df
    
    def create_access_dashboard(self):
        """アクセス分析ダッシュボードの作成"""
        df = self.load_logs_as_dataframe()
        
        if df.empty:
            print("No data available for analysis")
            return
        
        fig, axes = plt.subplots(2, 2, figsize=(15, 10))
        fig.suptitle('MCP Access Analysis Dashboard', fontsize=16)
        
        # 1. 時間別アクセス分布
        hourly_access = df.groupby('hour').size()
        axes[0, 0].bar(hourly_access.index, hourly_access.values)
        axes[0, 0].set_title('Access by Hour')
        axes[0, 0].set_xlabel('Hour of Day')
        axes[0, 0].set_ylabel('Access Count')
        
        # 2. クライアント別アクセス
        client_access = df['client_id'].value_counts()
        axes[0, 1].pie(client_access.values, labels=client_access.index, autopct='%1.1f%%')
        axes[0, 1].set_title('Access by Client')
        
        # 3. 成功/失敗率
        success_rate = df['success'].value_counts()
        axes[1, 0].bar(['Success', 'Failed'], [success_rate.get(True, 0), success_rate.get(False, 0)])
        axes[1, 0].set_title('Success vs Failed Requests')
        axes[1, 0].set_ylabel('Count')
        
        # 4. 最もアクセスされるリソース
        top_resources = df['resource_uri'].value_counts().head(10)
        axes[1, 1].barh(range(len(top_resources)), top_resources.values)
        axes[1, 1].set_yticks(range(len(top_resources)))
        axes[1, 1].set_yticklabels([r.split('/')[-1][:20] for r in top_resources.index])
        axes[1, 1].set_title('Top 10 Accessed Resources')
        axes[1, 1].set_xlabel('Access Count')
        
        plt.tight_layout()
        plt.show()
    
    def generate_access_report(self) -> Dict:
        """アクセスレポートの生成"""
        df = self.load_logs_as_dataframe()
        
        if df.empty:
            return {'error': 'No data available'}
        
        report = {
            'summary': {
                'total_requests': len(df),
                'unique_clients': df['client_id'].nunique(),
                'unique_resources': df['resource_uri'].nunique(),
                'success_rate': (df['success'].sum() / len(df)) * 100
            },
            'top_clients': df['client_id'].value_counts().head(5).to_dict(),
            'top_resources': df['resource_uri'].value_counts().head(10).to_dict(),
            'hourly_distribution': df.groupby('hour').size().to_dict(),
            'failed_requests': df[df['success'] == False]['resource_uri'].value_counts().head(5).to_dict()
        }
        
        return report

# 分析の実行
analyzer = AccessAnalyzer('access_logs.jsonl')
report = analyzer.generate_access_report()

print("=== MCP Access Report ===")
print(f"Total Requests: {report['summary']['total_requests']}")
print(f"Success Rate: {report['summary']['success_rate']:.1f}%")
print(f"Unique Clients: {report['summary']['unique_clients']}")
print(f"Unique Resources: {report['summary']['unique_resources']}")

コンプライアンス対応とデータ保護

GDPR対応のプライバシー保護

class PrivacyCompliantLogger:
    def __init__(self):
        self.encryption_key = self.generate_encryption_key()
        self.retention_days = 365  # データ保持期間
    
    def anonymize_client_id(self, client_id: str) -> str:
        """クライアントIDの匿名化"""
        return hashlib.sha256(client_id.encode()).hexdigest()[:16]
    
    def sanitize_resource_uri(self, uri: str) -> str:
        """リソースURIから個人情報を除去"""
        # 個人情報を含む可能性のあるパスを一般化
        sanitized = uri
        
        # 例:ユーザーIDを含むパスの匿名化
        import re
        sanitized = re.sub(r'/users/\d+/', '/users/[USER]/', sanitized)
        sanitized = re.sub(r'/profiles/[^/]+/', '/profiles/[PROFILE]/', sanitized)
        
        return sanitized
    
    def create_privacy_compliant_log(self, client_id: str, resource_uri: str, 
                                   action: str, success: bool) -> Dict:
        """プライバシー準拠のログエントリ作成"""
        return {
            'timestamp': datetime.datetime.now().isoformat(),
            'client_hash': self.anonymize_client_id(client_id),
            'resource_pattern': self.sanitize_resource_uri(resource_uri),
            'action': action,
            'success': success,
            'retention_until': (datetime.datetime.now() + 
                              datetime.timedelta(days=self.retention_days)).isoformat()
        }
    
    def cleanup_expired_logs(self, log_file: Path):
        """保持期限を過ぎたログの削除"""
        if not log_file.exists():
            return
        
        valid_logs = []
        now = datetime.datetime.now()
        
        with log_file.open('r') as f:
            for line in f:
                try:
                    log_entry = json.loads(line.strip())
                    retention_until = datetime.datetime.fromisoformat(log_entry['retention_until'])
                    
                    if retention_until > now:
                        valid_logs.append(log_entry)
                except (json.JSONDecodeError, KeyError):
                    continue
        
        # 有効なログのみを書き戻し
        with log_file.open('w') as f:
            for log_entry in valid_logs:
                f.write(json.dumps(log_entry) + '\n')
        
        print(f"Cleaned up expired logs. Retained {len(valid_logs)} entries.")

セキュリティベストプラクティス

1. アクセス制御の階層化

class HierarchicalAccessControl:
    def __init__(self):
        self.roles = {
            'viewer': ['read'],
            'editor': ['read', 'write'],
            'admin': ['read', 'write', 'delete', 'manage']
        }
        self.resource_groups = {
            'public': ['file://public/*'],
            'internal': ['file://internal/*'],
            'confidential': ['file://confidential/*']
        }
    
    def check_permission(self, client_role: str, action: str, resource_group: str) -> bool:
        """階層的権限チェック"""
        allowed_actions = self.roles.get(client_role, [])
        
        # リソースグループレベルでの制限
        if resource_group == 'confidential' and client_role != 'admin':
            return False
        
        return action in allowed_actions

2. 異常検知システム

from sklearn.ensemble import IsolationForest
import numpy as np

class AnomalyDetector:
    def __init__(self):
        self.model = IsolationForest(contamination=0.1, random_state=42)
        self.is_trained = False
    
    def prepare_features(self, logs: List[Dict]) -> np.ndarray:
        """ログデータから特徴量を抽出"""
        features = []
        
        for log in logs:
            timestamp = datetime.fromisoformat(log['timestamp'])
            feature_vector = [
                timestamp.hour,  # 時間
                timestamp.weekday(),  # 曜日
                len(log['resource_uri']),  # リソースURI長
                1 if log['success'] else 0,  # 成功フラグ
            ]
            features.append(feature_vector)
        
        return np.array(features)
    
    def train(self, logs: List[Dict]):
        """正常パターンでモデルを訓練"""
        features = self.prepare_features(logs)
        self.model.fit(features)
        self.is_trained = True
    
    def detect_anomalies(self, logs: List[Dict]) -> List[int]:
        """異常なアクセスパターンを検出"""
        if not self.is_trained:
            raise ValueError("Model must be trained first")
        
        features = self.prepare_features(logs)
        anomaly_scores = self.model.decision_function(features)
        anomalies = self.model.predict(features)
        
        # 異常と判定されたインデックスを返す
        return [i for i, anomaly in enumerate(anomalies) if anomaly == -1]

まとめ

MCPを活用したコンテンツアクセス管理により、以下の利益が得られます:

セキュリティの向上

  • 認証・認可の標準化: 統一されたアクセス制御メカニズム
  • 詳細なログ記録: すべてのアクセスの透明性確保
  • 異常検知: 自動化されたセキュリティモニタリング

コンプライアンス対応

  • データ保護規則への準拠: GDPRなどの要件に対応
  • 監査証跡: アクセス履歴の完全な記録
  • プライバシー保護: 適切な匿名化とデータ保持期間管理

運用効率の改善

  • 自動化されたモニタリング: 人的リソースの効率的活用
  • 可視化ダッシュボード: データドリブンな意思決定支援
  • 予防的セキュリティ: 問題の早期発見と対応

MCPの標準化されたアプローチにより、セキュアで透明性の高いコンテンツアクセス管理システムを構築できます。これは、クリエイターや組織にとって、デジタルアセットの価値を最大化しながらリスクを最小化する重要な基盤となります。


注意: MCPは発展中の技術です。実装の詳細は公式ドキュメント(https://modelcontextprotocol.io/)で最新情報を確認してください。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?