はじめに
Model Context Protocol (MCP) を通じてLLMアプリケーションが外部データソースからコンテンツを取得する際、そのデータの**完全性(integrity)と鮮度(freshness)**を確保することは重要です。
本記事では、ハッシュ値とタイムスタンプを活用したコンテンツ検証の実装方法について解説します。
注意: これらの機能はMCPの標準仕様には含まれていませんが、メタデータやカスタム実装を通じて追加できます。
1. コンテンツ完全性検証の必要性
1.1. 想定されるリスク
MCPを介したデータ取得では、以下のようなリスクが存在します。
| リスク | 説明 | 影響 |
|---|---|---|
| データ破損 | ネットワーク転送中のエラー | 不完全なデータによる誤った推論 |
| 意図しない改変 | キャッシュやプロキシでの変更 | 元データと異なる情報の利用 |
| 古いデータの利用 | 更新されていないキャッシュ | 時代遅れの情報に基づく回答 |
| 中間者攻撃 | 悪意ある第三者によるデータ改ざん | セキュリティ侵害 |
1.2. 検証の目的
- 完全性(Integrity): データが改ざんされていないことの確認
- 鮮度(Freshness): データが十分に新しいことの確認
- 信頼性(Authenticity): データが正当な送信元からのものであることの確認
2. 現在のMCP仕様における制約
2.1. 標準仕様の範囲
MCPの現行仕様では、以下のような基本的なメタデータをサポートしています。
interface Resource {
uri: string;
name: string;
description?: string;
mimeType?: string;
}
制約:
- ハッシュ値やタイムスタンプは標準フィールドに含まれていない
- 完全性検証の機能は組み込まれていない
- カスタムメタデータで拡張する必要がある
2.2. 拡張可能性
ただし、実装によっては以下のように拡張できます。
interface EnhancedResource extends Resource {
annotations?: {
integrity?: {
hash: string;
algorithm: string;
timestamp: string;
};
// その他のカスタムメタデータ
};
}
3. ハッシュ値による完全性検証
3.1. 基本的な実装
コンテンツのハッシュ値を計算し、送信側と受信側で照合します。
import hashlib
import json
from datetime import datetime
from typing import Optional, Dict, Any
class ContentIntegrityChecker:
"""コンテンツの完全性検証クラス"""
SUPPORTED_ALGORITHMS = ['sha256', 'sha512']
@staticmethod
def calculate_hash(content: str, algorithm: str = 'sha256') -> str:
"""
コンテンツのハッシュ値を計算
Args:
content: ハッシュ化するコンテンツ
algorithm: ハッシュアルゴリズム(sha256またはsha512)
Returns:
16進数表現のハッシュ値
"""
if algorithm not in ContentIntegrityChecker.SUPPORTED_ALGORITHMS:
raise ValueError(f"Unsupported algorithm: {algorithm}")
# コンテンツを正規化(改行コードの統一など)
normalized_content = content.encode('utf-8')
if algorithm == 'sha256':
hash_obj = hashlib.sha256(normalized_content)
elif algorithm == 'sha512':
hash_obj = hashlib.sha512(normalized_content)
return hash_obj.hexdigest()
@staticmethod
def verify_integrity(
content: str,
expected_hash: str,
algorithm: str = 'sha256'
) -> tuple[bool, Optional[str]]:
"""
コンテンツの完全性を検証
Returns:
(検証結果, エラーメッセージ)
"""
try:
actual_hash = ContentIntegrityChecker.calculate_hash(content, algorithm)
if actual_hash == expected_hash:
return True, None
else:
return False, f"Hash mismatch: expected {expected_hash}, got {actual_hash}"
except Exception as e:
return False, f"Verification error: {str(e)}"
# 使用例
content = "This is important data that should not be modified."
# 送信側:ハッシュを計算
hash_value = ContentIntegrityChecker.calculate_hash(content)
print(f"Hash: {hash_value}")
# 受信側:ハッシュを検証
is_valid, error = ContentIntegrityChecker.verify_integrity(content, hash_value)
if is_valid:
print("✅ Content integrity verified")
else:
print(f"❌ Integrity check failed: {error}")
3.2. MCPサーバー側の実装
送信側(MCPサーバー)でハッシュを計算し、メタデータに含めます。
from dataclasses import dataclass, asdict
from typing import Optional
import json
@dataclass
class IntegrityMetadata:
"""完全性検証のためのメタデータ"""
hash: str
algorithm: str
timestamp: str
content_length: int
class IntegrityAwareMCPServer:
"""完全性検証機能を持つMCPサーバー"""
def __init__(self):
self.checker = ContentIntegrityChecker()
def prepare_resource(self, content: str, resource_id: str) -> dict:
"""
リソースを完全性メタデータ付きで準備
Returns:
MCPリソース形式の辞書
"""
# ハッシュ値の計算
hash_value = self.checker.calculate_hash(content, algorithm='sha256')
# メタデータの作成
integrity_metadata = IntegrityMetadata(
hash=hash_value,
algorithm='sha256',
timestamp=datetime.utcnow().isoformat() + 'Z',
content_length=len(content.encode('utf-8'))
)
# MCPリソース形式
resource = {
'uri': f'content://{resource_id}',
'name': resource_id,
'mimeType': 'text/plain',
'annotations': {
'integrity': asdict(integrity_metadata)
}
}
return resource, content
def get_resource_with_integrity(self, resource_id: str) -> tuple[dict, str]:
"""
完全性情報付きでリソースを取得
"""
# 実際のコンテンツ取得(例)
content = self._fetch_content(resource_id)
# メタデータ付きで返す
return self.prepare_resource(content, resource_id)
def _fetch_content(self, resource_id: str) -> str:
"""コンテンツの取得(実装例)"""
# 実際はデータベースやファイルシステムから取得
return f"Content for resource {resource_id}"
# 使用例
server = IntegrityAwareMCPServer()
resource, content = server.get_resource_with_integrity("doc-001")
print("Resource metadata:")
print(json.dumps(resource, indent=2))
print(f"\nContent: {content}")
3.3. MCPクライアント側の実装
受信側(MCPクライアント)でハッシュを検証します。
class IntegrityVerifyingClient:
"""完全性検証機能を持つMCPクライアント"""
def __init__(self):
self.checker = ContentIntegrityChecker()
self.verification_log = []
def verify_and_use_resource(
self,
resource_metadata: dict,
content: str
) -> tuple[bool, Optional[str]]:
"""
リソースの完全性を検証してから使用
Returns:
(検証成功/失敗, エラーメッセージ)
"""
# メタデータから完全性情報を取得
integrity_info = resource_metadata.get('annotations', {}).get('integrity')
if not integrity_info:
# 完全性情報がない場合は警告
self._log_verification('warning', resource_metadata['uri'],
"No integrity metadata available")
return True, "No integrity check performed (metadata missing)"
# ハッシュ値の検証
expected_hash = integrity_info.get('hash')
algorithm = integrity_info.get('algorithm', 'sha256')
is_valid, error = self.checker.verify_integrity(
content, expected_hash, algorithm
)
# 検証結果をログ
if is_valid:
self._log_verification('success', resource_metadata['uri'],
f"Hash verified ({algorithm})")
else:
self._log_verification('failure', resource_metadata['uri'], error)
# コンテンツ長の検証(オプション)
expected_length = integrity_info.get('content_length')
if expected_length:
actual_length = len(content.encode('utf-8'))
if actual_length != expected_length:
return False, f"Content length mismatch: expected {expected_length}, got {actual_length}"
return is_valid, error
def _log_verification(self, status: str, resource_uri: str, message: str):
"""検証結果をログに記録"""
log_entry = {
'timestamp': datetime.utcnow().isoformat() + 'Z',
'status': status,
'resource_uri': resource_uri,
'message': message
}
self.verification_log.append(log_entry)
# コンソールにも出力
emoji = {
'success': '✅',
'failure': '❌',
'warning': '⚠️'
}.get(status, '❓')
print(f"{emoji} {status.upper()}: {resource_uri} - {message}")
def get_verification_report(self) -> dict:
"""検証レポートを取得"""
total = len(self.verification_log)
success = sum(1 for log in self.verification_log if log['status'] == 'success')
failure = sum(1 for log in self.verification_log if log['status'] == 'failure')
warning = sum(1 for log in self.verification_log if log['status'] == 'warning')
return {
'total_verifications': total,
'successful': success,
'failed': failure,
'warnings': warning,
'logs': self.verification_log
}
# 使用例
client = IntegrityVerifyingClient()
# リソースを検証
is_valid, error = client.verify_and_use_resource(resource, content)
if is_valid:
print("\n✅ Content can be safely used")
# LLMに渡す処理
else:
print(f"\n❌ Content verification failed: {error}")
# エラー処理
# レポート生成
report = client.get_verification_report()
print("\nVerification Report:")
print(json.dumps(report, indent=2))
4. タイムスタンプによる鮮度検証
4.1. 基本的な実装
コンテンツの作成時刻や有効期限を管理します。
from datetime import datetime, timedelta
from typing import Optional
class FreshnessChecker:
"""コンテンツの鮮度検証クラス"""
@staticmethod
def parse_timestamp(timestamp_str: str) -> datetime:
"""ISO 8601形式のタイムスタンプをパース"""
# 'Z'を'+00:00'に置換してパース
timestamp_str = timestamp_str.replace('Z', '+00:00')
return datetime.fromisoformat(timestamp_str)
@staticmethod
def is_fresh(
timestamp: str,
max_age_hours: Optional[int] = None,
expiry_time: Optional[str] = None
) -> tuple[bool, Optional[str]]:
"""
コンテンツが新鮮かどうかを判定
Args:
timestamp: コンテンツのタイムスタンプ
max_age_hours: 最大許容経過時間(時間)
expiry_time: 明示的な有効期限
Returns:
(鮮度判定結果, 理由)
"""
try:
content_time = FreshnessChecker.parse_timestamp(timestamp)
current_time = datetime.now(content_time.tzinfo)
# 有効期限チェック
if expiry_time:
expiry = FreshnessChecker.parse_timestamp(expiry_time)
if current_time > expiry:
age = current_time - expiry
return False, f"Content expired {age.total_seconds() / 3600:.1f} hours ago"
# 最大経過時間チェック
if max_age_hours is not None:
age = current_time - content_time
age_hours = age.total_seconds() / 3600
if age_hours > max_age_hours:
return False, f"Content is {age_hours:.1f} hours old (max: {max_age_hours} hours)"
return True, "Content is fresh"
except Exception as e:
return False, f"Timestamp validation error: {str(e)}"
@staticmethod
def calculate_expiry(created_at: str, ttl_hours: int) -> str:
"""
作成時刻とTTLから有効期限を計算
Args:
created_at: 作成時刻(ISO 8601形式)
ttl_hours: 有効時間(時間)
Returns:
有効期限(ISO 8601形式)
"""
created = FreshnessChecker.parse_timestamp(created_at)
expiry = created + timedelta(hours=ttl_hours)
return expiry.isoformat().replace('+00:00', 'Z')
# 使用例
timestamp = "2024-01-15T10:30:00Z"
# 24時間以内かチェック
is_fresh, reason = FreshnessChecker.is_fresh(timestamp, max_age_hours=24)
print(f"Freshness check: {is_fresh} - {reason}")
# 有効期限を計算
expiry = FreshnessChecker.calculate_expiry(timestamp, ttl_hours=48)
print(f"Content expires at: {expiry}")
# 有効期限でチェック
is_valid, reason = FreshnessChecker.is_fresh(timestamp, expiry_time=expiry)
print(f"Expiry check: {is_valid} - {reason}")
4.2. 鮮度メタデータの管理
@dataclass
class FreshnessMetadata:
"""鮮度管理のためのメタデータ"""
created_at: str
last_modified: str
expires_at: Optional[str] = None
ttl_hours: Optional[int] = None
source_timestamp: Optional[str] = None # データソースでの更新時刻
class FreshnessAwareServer:
"""鮮度管理機能を持つサーバー"""
def prepare_resource_with_freshness(
self,
content: str,
resource_id: str,
ttl_hours: int = 24
) -> dict:
"""鮮度メタデータ付きでリソースを準備"""
now = datetime.utcnow().isoformat() + 'Z'
expires_at = FreshnessChecker.calculate_expiry(now, ttl_hours)
freshness_metadata = FreshnessMetadata(
created_at=now,
last_modified=now,
expires_at=expires_at,
ttl_hours=ttl_hours
)
resource = {
'uri': f'content://{resource_id}',
'name': resource_id,
'mimeType': 'text/plain',
'annotations': {
'freshness': asdict(freshness_metadata)
}
}
return resource, content
class FreshnessVerifyingClient:
"""鮮度検証機能を持つクライアント"""
def __init__(self, default_max_age_hours: int = 24):
self.default_max_age_hours = default_max_age_hours
self.checker = FreshnessChecker()
def verify_freshness(
self,
resource_metadata: dict,
max_age_hours: Optional[int] = None
) -> tuple[bool, str, Optional[dict]]:
"""
リソースの鮮度を検証
Returns:
(検証結果, メッセージ, 鮮度情報)
"""
freshness_info = resource_metadata.get('annotations', {}).get('freshness')
if not freshness_info:
return True, "No freshness metadata available", None
max_age = max_age_hours or self.default_max_age_hours
# タイムスタンプで検証
timestamp = freshness_info.get('created_at')
expiry = freshness_info.get('expires_at')
is_fresh, reason = self.checker.is_fresh(
timestamp,
max_age_hours=max_age,
expiry_time=expiry
)
# 鮮度情報を返す
if timestamp:
created = self.checker.parse_timestamp(timestamp)
age_hours = (datetime.now(created.tzinfo) - created).total_seconds() / 3600
freshness_data = {
'age_hours': round(age_hours, 2),
'created_at': timestamp,
'expires_at': expiry,
'is_fresh': is_fresh
}
else:
freshness_data = None
return is_fresh, reason, freshness_data
def use_resource_with_freshness_check(
self,
resource_metadata: dict,
content: str,
max_age_hours: Optional[int] = None,
allow_stale: bool = False
) -> tuple[bool, str, Optional[str]]:
"""
鮮度をチェックしてからリソースを使用
Args:
allow_stale: Trueの場合、古いデータでも警告付きで使用
Returns:
(使用可否, メッセージ, 警告テキスト)
"""
is_fresh, reason, freshness_data = self.verify_freshness(
resource_metadata, max_age_hours
)
if is_fresh:
return True, reason, None
if allow_stale:
# 古いデータでも使用するが警告を付与
if freshness_data:
age = freshness_data['age_hours']
warning = f"\n⚠️ Note: This information is {age:.1f} hours old and may be outdated."
return True, f"Using stale content: {reason}", warning
return False, f"Content too old: {reason}", None
# 使用例
server = FreshnessAwareServer()
resource, content = server.prepare_resource_with_freshness("Sample content", "doc-001", ttl_hours=12)
client = FreshnessVerifyingClient(default_max_age_hours=24)
can_use, message, warning = client.use_resource_with_freshness_check(
resource, content, allow_stale=True
)
print(f"Can use: {can_use}")
print(f"Message: {message}")
if warning:
print(f"Warning: {warning}")
5. 統合実装:完全性と鮮度の両方
完全性検証と鮮度検証を統合したクラスです。
@dataclass
class ComprehensiveMetadata:
"""包括的な検証メタデータ"""
# 完全性関連
hash: str
algorithm: str
content_length: int
# 鮮度関連
created_at: str
last_modified: str
expires_at: Optional[str] = None
# その他
version: str = "1.0"
source: Optional[str] = None
class ComprehensiveVerifier:
"""完全性と鮮度の両方を検証"""
def __init__(self):
self.integrity_checker = ContentIntegrityChecker()
self.freshness_checker = FreshnessChecker()
self.verification_log = []
def verify_resource(
self,
resource_metadata: dict,
content: str,
max_age_hours: Optional[int] = 24,
require_integrity: bool = True,
require_freshness: bool = True
) -> tuple[bool, dict]:
"""
リソースを包括的に検証
Returns:
(検証成功/失敗, 検証結果の詳細)
"""
result = {
'overall_valid': True,
'checks': {},
'warnings': [],
'errors': []
}
metadata = resource_metadata.get('annotations', {})
# 完全性検証
if require_integrity:
integrity_info = metadata.get('integrity')
if integrity_info:
is_valid, error = self.integrity_checker.verify_integrity(
content,
integrity_info['hash'],
integrity_info.get('algorithm', 'sha256')
)
result['checks']['integrity'] = is_valid
if not is_valid:
result['overall_valid'] = False
result['errors'].append(f"Integrity check failed: {error}")
else:
result['warnings'].append("No integrity metadata available")
# 鮮度検証
if require_freshness:
freshness_info = metadata.get('freshness')
if freshness_info:
is_fresh, reason = self.freshness_checker.is_fresh(
freshness_info['created_at'],
max_age_hours=max_age_hours,
expiry_time=freshness_info.get('expires_at')
)
result['checks']['freshness'] = is_fresh
if not is_fresh:
result['warnings'].append(f"Freshness check: {reason}")
else:
result['warnings'].append("No freshness metadata available")
# ログに記録
self._log_verification(resource_metadata['uri'], result)
return result['overall_valid'], result
def _log_verification(self, resource_uri: str, result: dict):
"""検証結果をログに記録"""
log_entry = {
'timestamp': datetime.utcnow().isoformat() + 'Z',
'resource_uri': resource_uri,
'result': result
}
self.verification_log.append(log_entry)
# 使用例
verifier = ComprehensiveVerifier()
# 包括的な検証
is_valid, result = verifier.verify_resource(
resource_metadata=resource,
content=content,
max_age_hours=24,
require_integrity=True,
require_freshness=True
)
print("Verification Result:")
print(json.dumps(result, indent=2))
if is_valid:
print("\n✅ Resource passed all checks and can be used safely")
else:
print("\n❌ Resource failed verification")
for error in result['errors']:
print(f" - {error}")
if result['warnings']:
print("\n⚠️ Warnings:")
for warning in result['warnings']:
print(f" - {warning}")
6. 実装上の考慮事項
6.1. パフォーマンスへの影響
ハッシュ計算のコスト:
import time
def benchmark_hash_calculation(content_sizes_mb: list):
"""ハッシュ計算のパフォーマンス測定"""
checker = ContentIntegrityChecker()
for size_mb in content_sizes_mb:
# テストデータ生成
content = "x" * (size_mb * 1024 * 1024)
# SHA-256
start = time.time()
hash_sha256 = checker.calculate_hash(content, 'sha256')
time_sha256 = time.time() - start
# SHA-512
start = time.time()
hash_sha512 = checker.calculate_hash(content, 'sha512')
time_sha512 = time.time() - start
print(f"{size_mb}MB:")
print(f" SHA-256: {time_sha256:.3f}s")
print(f" SHA-512: {time_sha512:.3f}s")
# 小さいコンテンツではオーバーヘッドは無視できる
# 大きいコンテンツでは非同期処理を検討
最適化手法:
- 小さいコンテンツ(< 1MB): 同期的に計算
- 大きいコンテンツ(>= 1MB): 非同期処理またはチャンク単位で計算
- キャッシング: 同じコンテンツの再検証を避ける
6.2. エラーハンドリング
class IntegrityError(Exception):
"""完全性検証エラー"""
pass
class FreshnessError(Exception):
"""鮮度検証エラー"""
pass
def safe_verify_and_use(
resource_metadata: dict,
content: str,
strict_mode: bool = False
) -> Optional[str]:
"""
安全な検証と使用
Args:
strict_mode: Trueの場合、警告も致命的エラーとして扱う
"""
verifier = ComprehensiveVerifier()
try:
is_valid, result = verifier.verify_resource(
resource_metadata, content
)
if not is_valid:
raise IntegrityError(f"Verification failed: {result['errors']}")
if strict_mode and result['warnings']:
raise FreshnessError(f"Strict mode: {result['warnings']}")
return content
except IntegrityError as e:
print(f"❌ Integrity error: {e}")
# ログ記録、アラート送信など
return None
except FreshnessError as e:
print(f"⚠️ Freshness warning: {e}")
if strict_mode:
return None
# 警告付きで使用を許可
return content
except Exception as e:
print(f"❌ Unexpected error: {e}")
return None
6.3. セキュリティ考慮事項
ハッシュアルゴリズムの選択:
- SHA-256: バランスが良く、広く使われている(推奨)
- SHA-512: より高いセキュリティが必要な場合
- MD5, SHA-1: 非推奨(衝突攻撃に脆弱)
タイムスタンプの信頼性:
- サーバー時刻の同期(NTP)が重要
- タイムゾーンの統一(UTC推奨)
- タイムスタンプの改ざん防止(署名など)
まとめ
MCPにおけるコンテンツの完全性と鮮度の検証は、以下のアプローチで実装できます。
実装のポイント:
-
ハッシュ値による完全性検証
- 送信側でハッシュを計算
- メタデータに含めて送信
- 受信側で再計算して照合
-
タイムスタンプによる鮮度検証
- 作成時刻と有効期限を管理
- 最大経過時間をチェック
- 古いデータには警告を付与
-
統合的なアプローチ
- 両方の検証を組み合わせる
- 適切なエラーハンドリング
- パフォーマンスとセキュリティのバランス
制限事項:
- MCPの標準仕様には含まれない(カスタム実装が必要)
- 完全な改ざん防止には署名やPKIが必要
- ネットワーク層のセキュリティ(TLS等)との併用が望ましい
7. 高度な実装例
7.1. 署名による認証の追加
完全性だけでなく、送信元の認証も行う場合は、デジタル署名を使用します。
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.hazmat.backends import default_backend
import base64
class SignatureVerifier:
"""デジタル署名による認証"""
def __init__(self):
# 実際の運用では、鍵管理システムから取得
self.private_key = None
self.public_key = None
def generate_keypair(self):
"""鍵ペアの生成(デモ用)"""
self.private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
backend=default_backend()
)
self.public_key = self.private_key.public_key()
def sign_content(self, content: str) -> str:
"""
コンテンツに署名
Returns:
Base64エンコードされた署名
"""
if not self.private_key:
raise ValueError("Private key not available")
content_bytes = content.encode('utf-8')
signature = self.private_key.sign(
content_bytes,
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
salt_length=padding.PSS.MAX_LENGTH
),
hashes.SHA256()
)
return base64.b64encode(signature).decode('utf-8')
def verify_signature(
self,
content: str,
signature_b64: str
) -> tuple[bool, Optional[str]]:
"""
署名を検証
Returns:
(検証結果, エラーメッセージ)
"""
if not self.public_key:
return False, "Public key not available"
try:
content_bytes = content.encode('utf-8')
signature = base64.b64decode(signature_b64)
self.public_key.verify(
signature,
content_bytes,
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
salt_length=padding.PSS.MAX_LENGTH
),
hashes.SHA256()
)
return True, None
except Exception as e:
return False, f"Signature verification failed: {str(e)}"
# 使用例
signer = SignatureVerifier()
signer.generate_keypair()
content = "Important authenticated content"
# 送信側:署名
signature = signer.sign_content(content)
print(f"Signature: {signature[:50]}...")
# 受信側:検証
is_valid, error = signer.verify_signature(content, signature)
if is_valid:
print("✅ Signature verified - content is authentic")
else:
print(f"❌ Signature verification failed: {error}")
7.2. キャッシュとの統合
検証済みコンテンツをキャッシュし、パフォーマンスを向上させます。
from typing import Optional, Tuple
from datetime import datetime, timedelta
import hashlib
class VerifiedContentCache:
"""検証済みコンテンツのキャッシュ"""
def __init__(self, cache_ttl_minutes: int = 30):
self.cache = {}
self.cache_ttl = timedelta(minutes=cache_ttl_minutes)
def get_cache_key(self, resource_uri: str, content_hash: str) -> str:
"""キャッシュキーを生成"""
key_string = f"{resource_uri}:{content_hash}"
return hashlib.sha256(key_string.encode()).hexdigest()
def get(
self,
resource_uri: str,
content_hash: str
) -> Optional[Tuple[str, dict]]:
"""
キャッシュから取得
Returns:
(コンテンツ, 検証結果) or None
"""
cache_key = self.get_cache_key(resource_uri, content_hash)
if cache_key in self.cache:
cached_data = self.cache[cache_key]
cached_time = cached_data['cached_at']
# TTLチェック
if datetime.now() - cached_time < self.cache_ttl:
return cached_data['content'], cached_data['verification_result']
else:
# 期限切れのエントリを削除
del self.cache[cache_key]
return None
def put(
self,
resource_uri: str,
content_hash: str,
content: str,
verification_result: dict
):
"""キャッシュに保存"""
cache_key = self.get_cache_key(resource_uri, content_hash)
self.cache[cache_key] = {
'content': content,
'verification_result': verification_result,
'cached_at': datetime.now()
}
def invalidate(self, resource_uri: str = None):
"""キャッシュを無効化"""
if resource_uri:
# 特定のリソースのキャッシュのみ削除
keys_to_delete = [
key for key, value in self.cache.items()
if resource_uri in str(value)
]
for key in keys_to_delete:
del self.cache[key]
else:
# 全キャッシュをクリア
self.cache.clear()
def get_stats(self) -> dict:
"""キャッシュ統計を取得"""
total_entries = len(self.cache)
total_size_bytes = sum(
len(entry['content'].encode('utf-8'))
for entry in self.cache.values()
)
return {
'total_entries': total_entries,
'total_size_mb': round(total_size_bytes / (1024 * 1024), 2),
'cache_ttl_minutes': self.cache_ttl.total_seconds() / 60
}
class CachedVerifier(ComprehensiveVerifier):
"""キャッシュ機能付き検証クラス"""
def __init__(self, cache_ttl_minutes: int = 30):
super().__init__()
self.cache = VerifiedContentCache(cache_ttl_minutes)
self.cache_hits = 0
self.cache_misses = 0
def verify_resource_cached(
self,
resource_metadata: dict,
content: str,
**kwargs
) -> Tuple[bool, dict]:
"""
キャッシュを使用した検証
"""
# ハッシュを計算
content_hash = self.integrity_checker.calculate_hash(content)
resource_uri = resource_metadata['uri']
# キャッシュ確認
cached = self.cache.get(resource_uri, content_hash)
if cached:
self.cache_hits += 1
content_cached, result = cached
print(f"✅ Cache hit for {resource_uri}")
return result['overall_valid'], result
# キャッシュミス:検証を実行
self.cache_misses += 1
print(f"❌ Cache miss for {resource_uri}")
is_valid, result = self.verify_resource(
resource_metadata, content, **kwargs
)
# 検証成功時のみキャッシュに保存
if is_valid:
self.cache.put(resource_uri, content_hash, content, result)
return is_valid, result
def get_cache_stats(self) -> dict:
"""キャッシュ統計を取得"""
total_requests = self.cache_hits + self.cache_misses
hit_rate = (self.cache_hits / total_requests * 100) if total_requests > 0 else 0
cache_stats = self.cache.get_stats()
cache_stats.update({
'cache_hits': self.cache_hits,
'cache_misses': self.cache_misses,
'hit_rate_percent': round(hit_rate, 2)
})
return cache_stats
# 使用例
cached_verifier = CachedVerifier(cache_ttl_minutes=30)
# 初回:検証を実行
is_valid1, result1 = cached_verifier.verify_resource_cached(
resource, content, max_age_hours=24
)
# 2回目:キャッシュから取得(高速)
is_valid2, result2 = cached_verifier.verify_resource_cached(
resource, content, max_age_hours=24
)
# 統計表示
stats = cached_verifier.get_cache_stats()
print("\nCache Statistics:")
print(json.dumps(stats, indent=2))
7.3. 非同期処理の実装
大きなコンテンツの検証を非同期で行います。
import asyncio
from typing import Coroutine
class AsyncIntegrityChecker:
"""非同期完全性検証クラス"""
@staticmethod
async def calculate_hash_async(
content: str,
algorithm: str = 'sha256'
) -> str:
"""非同期でハッシュを計算"""
# CPUバウンドな処理なので、スレッドプールで実行
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None,
ContentIntegrityChecker.calculate_hash,
content,
algorithm
)
@staticmethod
async def verify_integrity_async(
content: str,
expected_hash: str,
algorithm: str = 'sha256'
) -> Tuple[bool, Optional[str]]:
"""非同期で完全性を検証"""
actual_hash = await AsyncIntegrityChecker.calculate_hash_async(
content, algorithm
)
if actual_hash == expected_hash:
return True, None
else:
return False, f"Hash mismatch: expected {expected_hash}, got {actual_hash}"
class AsyncVerifier:
"""非同期検証クラス"""
def __init__(self):
self.integrity_checker = AsyncIntegrityChecker()
self.freshness_checker = FreshnessChecker()
async def verify_resource_async(
self,
resource_metadata: dict,
content: str,
max_age_hours: Optional[int] = 24
) -> Tuple[bool, dict]:
"""非同期でリソースを検証"""
result = {
'overall_valid': True,
'checks': {},
'warnings': [],
'errors': []
}
metadata = resource_metadata.get('annotations', {})
# 完全性検証(非同期)
integrity_task = None
integrity_info = metadata.get('integrity')
if integrity_info:
integrity_task = self.integrity_checker.verify_integrity_async(
content,
integrity_info['hash'],
integrity_info.get('algorithm', 'sha256')
)
# 鮮度検証(同期でも高速)
freshness_info = metadata.get('freshness')
if freshness_info:
is_fresh, reason = self.freshness_checker.is_fresh(
freshness_info['created_at'],
max_age_hours=max_age_hours,
expiry_time=freshness_info.get('expires_at')
)
result['checks']['freshness'] = is_fresh
if not is_fresh:
result['warnings'].append(f"Freshness check: {reason}")
# 完全性検証の結果を待つ
if integrity_task:
is_valid, error = await integrity_task
result['checks']['integrity'] = is_valid
if not is_valid:
result['overall_valid'] = False
result['errors'].append(f"Integrity check failed: {error}")
return result['overall_valid'], result
async def verify_multiple_resources(
self,
resources: list[Tuple[dict, str]]
) -> list[Tuple[bool, dict]]:
"""複数のリソースを並行して検証"""
tasks = [
self.verify_resource_async(metadata, content)
for metadata, content in resources
]
results = await asyncio.gather(*tasks)
return results
# 使用例
async def async_verification_example():
"""非同期検証の使用例"""
verifier = AsyncVerifier()
# 単一リソースの検証
is_valid, result = await verifier.verify_resource_async(
resource, content, max_age_hours=24
)
print("Async Verification Result:")
print(json.dumps(result, indent=2))
# 複数リソースの並行検証
resources = [
(resource, content),
(resource, content), # 同じリソースでデモ
(resource, content)
]
print("\nVerifying multiple resources in parallel...")
start_time = datetime.now()
results = await verifier.verify_multiple_resources(resources)
elapsed = (datetime.now() - start_time).total_seconds()
print(f"Verified {len(resources)} resources in {elapsed:.2f} seconds")
for i, (is_valid, result) in enumerate(results):
status = "✅" if is_valid else "❌"
print(f"{status} Resource {i+1}: {'Valid' if is_valid else 'Invalid'}")
# 実行
# asyncio.run(async_verification_example())
7.4. 監視とアラート
検証失敗時の監視とアラート機能です。
from enum import Enum
from typing import Callable, List
class AlertLevel(Enum):
"""アラートレベル"""
INFO = "info"
WARNING = "warning"
ERROR = "error"
CRITICAL = "critical"
class Alert:
"""アラート情報"""
def __init__(
self,
level: AlertLevel,
message: str,
resource_uri: str,
details: dict
):
self.level = level
self.message = message
self.resource_uri = resource_uri
self.details = details
self.timestamp = datetime.utcnow().isoformat() + 'Z'
class AlertHandler:
"""アラート処理"""
def __init__(self):
self.handlers: List[Callable[[Alert], None]] = []
self.alert_history: List[Alert] = []
def register_handler(self, handler: Callable[[Alert], None]):
"""アラートハンドラを登録"""
self.handlers.append(handler)
def send_alert(self, alert: Alert):
"""アラートを送信"""
self.alert_history.append(alert)
for handler in self.handlers:
try:
handler(alert)
except Exception as e:
print(f"Alert handler error: {e}")
def get_alert_summary(self) -> dict:
"""アラートサマリーを取得"""
return {
'total_alerts': len(self.alert_history),
'by_level': {
level.value: sum(
1 for a in self.alert_history
if a.level == level
)
for level in AlertLevel
}
}
class MonitoredVerifier(ComprehensiveVerifier):
"""監視機能付き検証クラス"""
def __init__(self, alert_handler: AlertHandler):
super().__init__()
self.alert_handler = alert_handler
def verify_resource_monitored(
self,
resource_metadata: dict,
content: str,
**kwargs
) -> Tuple[bool, dict]:
"""監視機能付き検証"""
is_valid, result = self.verify_resource(
resource_metadata, content, **kwargs
)
resource_uri = resource_metadata['uri']
# エラーがある場合はアラート
if result['errors']:
alert = Alert(
level=AlertLevel.ERROR,
message=f"Verification failed for {resource_uri}",
resource_uri=resource_uri,
details={
'errors': result['errors'],
'checks': result['checks']
}
)
self.alert_handler.send_alert(alert)
# 警告がある場合も通知
elif result['warnings']:
alert = Alert(
level=AlertLevel.WARNING,
message=f"Verification warnings for {resource_uri}",
resource_uri=resource_uri,
details={
'warnings': result['warnings'],
'checks': result['checks']
}
)
self.alert_handler.send_alert(alert)
return is_valid, result
# アラートハンドラの実装例
def console_alert_handler(alert: Alert):
"""コンソールにアラートを出力"""
emoji = {
AlertLevel.INFO: "ℹ️",
AlertLevel.WARNING: "⚠️",
AlertLevel.ERROR: "❌",
AlertLevel.CRITICAL: "🚨"
}
print(f"\n{emoji[alert.level]} {alert.level.value.upper()} ALERT")
print(f"Time: {alert.timestamp}")
print(f"Message: {alert.message}")
print(f"Resource: {alert.resource_uri}")
print(f"Details: {json.dumps(alert.details, indent=2)}")
def log_alert_handler(alert: Alert):
"""ログファイルにアラートを記録"""
with open("verification_alerts.log", "a") as f:
f.write(json.dumps({
'timestamp': alert.timestamp,
'level': alert.level.value,
'message': alert.message,
'resource': alert.resource_uri,
'details': alert.details
}) + '\n')
# 使用例
alert_handler = AlertHandler()
alert_handler.register_handler(console_alert_handler)
alert_handler.register_handler(log_alert_handler)
monitored_verifier = MonitoredVerifier(alert_handler)
# 検証実行(アラートが自動で送信される)
is_valid, result = monitored_verifier.verify_resource_monitored(
resource, content, max_age_hours=24
)
# アラートサマリー
summary = alert_handler.get_alert_summary()
print("\nAlert Summary:")
print(json.dumps(summary, indent=2))
8. ベストプラクティス
8.1. セキュリティ
-
常にTLS/HTTPSを使用
- ハッシュ検証は転送中の改ざんを検出するが、盗聴は防げない
- ネットワーク層の暗号化と併用する
-
適切なハッシュアルゴリズムの選択
- SHA-256以上を使用
- MD5やSHA-1は避ける
-
タイムスタンプの保護
- 信頼できる時刻ソース(NTP)を使用
- 可能であれば署名で保護
8.2. パフォーマンス
-
キャッシングの活用
- 検証済みコンテンツはキャッシュ
- 適切なTTLを設定
-
非同期処理
- 大きなコンテンツは非同期で検証
- 複数リソースの並行処理
-
段階的検証
- 小さなコンテンツ: 即座に検証
- 大きなコンテンツ: チャンク単位で検証
8.3. 運用
-
監視とアラート
- 検証失敗率を監視
- 異常なパターンを検出
-
ログと監査
- すべての検証結果を記録
- 定期的にレポートを生成
-
段階的な導入
- まず警告モードで運用
- 問題がないことを確認してから強制モードに
まとめ
MCPにおけるコンテンツの完全性と鮮度の検証は、信頼性の高いLLMアプリケーションの構築に不可欠です。
重要なポイント:
-
ハッシュ値による完全性検証
- データ転送中の改ざんや破損を検出
- SHA-256以上のアルゴリズムを使用
- メタデータとして送信
-
タイムスタンプによる鮮度検証
- 古いデータの利用を防止
- TTLや有効期限を管理
- 警告付きで柔軟に対応
-
実装の考慮事項
- パフォーマンスへの影響
- キャッシングによる最適化
- 監視とアラートの重要性
制限事項:
- MCPの標準仕様ではない(拡張が必要)
- ネットワーク層のセキュリティと併用が必須
- 完全な改ざん防止には署名やPKIが必要
実装にあたっては、システムの要件に応じて、適切なレベルの検証を選択してください。
参考情報
免責事項: 本記事は技術的な実装例を示すものであり、特定のセキュリティソリューションを保証するものではありません。実際の運用にあたっては、セキュリティ専門家への相談をお勧めします。
注意: MCPはAnthropicが開発した比較的新しいプロトコルです。最新の情報については、公式ドキュメントを参照してください。