はじめに
Model Context Protocol(MCP)アプリケーションにおいて、データクエリの効率性は応答速度とユーザー体験に直結します。MCPサーバーが管理する大量のリソースから必要な情報を素早く取得するには、戦略的なクエリ最適化が不可欠です。
本記事では、MCPデータクエリのパフォーマンスを向上させる実践的なテクニックを、具体的な実装例とともに解説します。
1. MCPデータクエリの基礎理解
1.1 MCPのデータアクセスパターン
MCPでは、以下の方法でデータにアクセスします:
// リソース一覧の取得
const resources = await client.listResources();
// 特定リソースの読み取り
const content = await client.readResource("file://docs/api-spec.md");
// ツールの実行
const result = await client.callTool("search_database", {
query: "user analytics",
limit: 10
});
1.2 クエリパフォーマンスに影響する要因
- データ量: リソースサイズとレコード数
- クエリ複雑度: フィルタリング条件の数と種類
- ネットワーク遅延: サーバー-クライアント間の通信
- データソースの特性: ファイルシステム、データベース、API等
2. リソースクエリの最適化戦略
2.1 効率的なリソース設計
適切な粒度での分割
class OptimizedMCPServer:
def list_resources(self) -> list:
"""効率的なリソース構造の設計例"""
return [
# ❌ 巨大すぎるリソース
# {"uri": "database://all_data", "name": "Complete Database"}
# ✅ 適切に分割されたリソース
{
"uri": "database://users/active",
"name": "Active Users",
"description": "Currently active user profiles",
"mimeType": "application/json"
},
{
"uri": "database://products/category/{category_id}",
"name": "Products by Category",
"description": "Products filtered by category ID",
"mimeType": "application/json"
},
{
"uri": "logs://app/recent/{hours}",
"name": "Recent Application Logs",
"description": "Application logs from last N hours",
"mimeType": "text/plain"
}
]
2.2 パラメータ化によるデータ絞り込み
import re
from datetime import datetime, timedelta
from urllib.parse import parse_qs, urlparse
class ParameterizedMCPServer:
def read_resource(self, uri: str) -> dict:
"""URIパラメータに基づく動的データ取得"""
# URIの解析
parsed = urlparse(uri)
path_parts = parsed.path.split('/')
query_params = parse_qs(parsed.query)
if 'logs' in path_parts:
return self._get_filtered_logs(path_parts, query_params)
elif 'users' in path_parts:
return self._get_filtered_users(path_parts, query_params)
elif 'products' in path_parts:
return self._get_filtered_products(path_parts, query_params)
return self._get_default_resource(uri)
def _get_filtered_logs(self, path_parts: list, params: dict) -> dict:
"""ログデータの効率的取得"""
# 時間範囲の指定
hours = int(path_parts[-1]) if path_parts[-1].isdigit() else 24
since = datetime.now() - timedelta(hours=hours)
# レベルフィルタの適用
level_filter = params.get('level', ['INFO'])[0].upper()
# 最大件数の制限
limit = min(int(params.get('limit', [1000])[0]), 10000)
# 効率的なクエリ実行
logs = self.log_repository.get_logs(
since=since,
level=level_filter,
limit=limit
)
return {
"contents": [{
"type": "text",
"text": f"Retrieved {len(logs)} log entries from {since.isoformat()}\n" +
"\n".join([self._format_log_entry(log) for log in logs])
}]
}
def _get_filtered_users(self, path_parts: list, params: dict) -> dict:
"""ユーザーデータの効率的取得"""
# フィルタ条件の抽出
status = params.get('status', ['active'])[0]
fields = params.get('fields', ['id', 'name', 'email'])
limit = min(int(params.get('limit', [100])[0]), 1000)
# インデックスを活用したクエリ
users = self.user_repository.get_users(
status=status,
fields=fields,
limit=limit
)
return {
"contents": [{
"type": "text",
"text": json.dumps({
"total": len(users),
"users": users
}, indent=2)
}]
}
3. データベースクエリの最適化
3.1 インデックス戦略
-- MCPアクセスパターンに最適化されたインデックス
-- 時系列データ用インデックス
CREATE INDEX idx_logs_timestamp_level
ON application_logs(timestamp DESC, log_level)
WHERE timestamp >= CURRENT_TIMESTAMP - INTERVAL '7 days';
-- ユーザー検索用複合インデックス
CREATE INDEX idx_users_status_name
ON users(status, name)
WHERE status IN ('active', 'premium');
-- 製品カテゴリ用インデックス
CREATE INDEX idx_products_category_price
ON products(category_id, price DESC, created_at DESC)
WHERE is_active = true;
-- 全文検索用インデックス
CREATE INDEX idx_documents_fts
ON documents
USING GIN(to_tsvector('english', title || ' ' || content));
3.2 効率的なクエリパターン
class DatabaseOptimizedQueries:
def __init__(self):
self.connection_pool = self._create_pool()
def search_documents(self, query: str, limit: int = 50) -> list:
"""全文検索の最適化例"""
# プリペアードステートメント使用
sql = """
SELECT
id, title, summary,
ts_rank(search_vector, plainto_tsquery($1)) as rank
FROM documents
WHERE search_vector @@ plainto_tsquery($1)
AND is_published = true
ORDER BY rank DESC, updated_at DESC
LIMIT $2
"""
with self.connection_pool.get_connection() as conn:
cursor = conn.cursor()
cursor.execute(sql, [query, limit])
return cursor.fetchall()
def get_user_analytics(self, user_ids: list, date_range: tuple) -> dict:
"""バッチクエリによる効率化"""
# IN句を使用した効率的なバッチ取得
placeholders = ','.join(['$' + str(i+3) for i in range(len(user_ids))])
sql = f"""
SELECT
user_id,
COUNT(*) as action_count,
MAX(created_at) as last_activity,
AVG(session_duration) as avg_session_duration
FROM user_activities
WHERE user_id IN ({placeholders})
AND created_at BETWEEN $1 AND $2
GROUP BY user_id
"""
params = [date_range[0], date_range[1]] + user_ids
with self.connection_pool.get_connection() as conn:
cursor = conn.cursor()
cursor.execute(sql, params)
results = cursor.fetchall()
return {
row['user_id']: {
'action_count': row['action_count'],
'last_activity': row['last_activity'].isoformat(),
'avg_session_duration': float(row['avg_session_duration'] or 0)
}
for row in results
}
4. キャッシング戦略
4.1 多層キャッシュシステム
import redis
import json
from functools import wraps
from typing import Optional, Any
class MultiLayerCache:
def __init__(self):
# L1: メモリキャッシュ(高速だが容量小)
self.memory_cache = {}
self.memory_cache_max_size = 1000
# L2: Redisキャッシュ(中速で大容量)
self.redis_client = redis.Redis(
host='localhost',
port=6379,
decode_responses=True
)
def cache_query_result(self, cache_key: str, ttl: int = 300):
"""クエリ結果キャッシュデコレータ"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# L1キャッシュチェック
if cache_key in self.memory_cache:
return self.memory_cache[cache_key]
# L2キャッシュチェック
cached_result = self.redis_client.get(cache_key)
if cached_result:
result = json.loads(cached_result)
self._update_memory_cache(cache_key, result)
return result
# キャッシュミス:実際にクエリ実行
result = func(*args, **kwargs)
# 両方のキャッシュに保存
self._save_to_cache(cache_key, result, ttl)
return result
return wrapper
return decorator
def _update_memory_cache(self, key: str, value: Any):
"""メモリキャッシュの更新(LRU)"""
if len(self.memory_cache) >= self.memory_cache_max_size:
# 最も古いエントリを削除
oldest_key = next(iter(self.memory_cache))
del self.memory_cache[oldest_key]
self.memory_cache[key] = value
def _save_to_cache(self, key: str, value: Any, ttl: int):
"""両方のキャッシュに保存"""
# メモリキャッシュ
self._update_memory_cache(key, value)
# Redisキャッシュ
self.redis_client.setex(
key,
ttl,
json.dumps(value, default=str)
)
# 使用例
class CachedMCPServer:
def __init__(self):
self.cache = MultiLayerCache()
self.db = DatabaseOptimizedQueries()
def call_tool(self, name: str, arguments: dict) -> list:
if name == "search_products":
return self._search_products_cached(
arguments.get("category"),
arguments.get("query", ""),
arguments.get("limit", 20)
)
@MultiLayerCache.cache_query_result("products_search_{category}_{query}_{limit}", ttl=600)
def _search_products_cached(self, category: str, query: str, limit: int) -> list:
"""キャッシュされた製品検索"""
results = self.db.search_products(
category=category,
query=query,
limit=limit
)
return [{
"type": "text",
"text": json.dumps({
"category": category,
"query": query,
"total_results": len(results),
"products": results
}, indent=2)
}]
4.2 スマートキャッシュ無効化
class SmartCacheInvalidation:
def __init__(self, cache: MultiLayerCache):
self.cache = cache
self.cache_dependencies = {}
def register_dependency(self, cache_key: str, dependencies: list):
"""キャッシュキーと依存関係の登録"""
for dep in dependencies:
if dep not in self.cache_dependencies:
self.cache_dependencies[dep] = set()
self.cache_dependencies[dep].add(cache_key)
def invalidate_by_dependency(self, dependency: str):
"""依存関係に基づくキャッシュ無効化"""
if dependency in self.cache_dependencies:
for cache_key in self.cache_dependencies[dependency]:
self._invalidate_cache_key(cache_key)
def _invalidate_cache_key(self, cache_key: str):
"""特定キーのキャッシュ無効化"""
# メモリキャッシュから削除
self.cache.memory_cache.pop(cache_key, None)
# Redisキャッシュから削除
self.cache.redis_client.delete(cache_key)
# データ更新時の自動無効化
class DataUpdateHandler:
def __init__(self, cache_invalidator: SmartCacheInvalidation):
self.cache_invalidator = cache_invalidator
def update_product(self, product_id: str, updates: dict):
"""製品更新時のキャッシュ無効化"""
# データベース更新
self.db.update_product(product_id, updates)
# 関連キャッシュを無効化
self.cache_invalidator.invalidate_by_dependency(f"product_{product_id}")
self.cache_invalidator.invalidate_by_dependency("product_list")
if 'category_id' in updates:
category_id = updates['category_id']
self.cache_invalidator.invalidate_by_dependency(f"category_{category_id}")
5. 非同期処理によるクエリ並列化
5.1 並列データ取得
import asyncio
import aioredis
import asyncpg
class AsyncQueryOptimizer:
def __init__(self):
self.db_pool = None
self.redis_pool = None
async def initialize(self):
"""非同期コネクションプールの初期化"""
self.db_pool = await asyncpg.create_pool(
"postgresql://user:pass@localhost/mcpdb",
min_size=5,
max_size=20
)
self.redis_pool = aioredis.ConnectionPool.from_url("redis://localhost")
async def parallel_data_fetch(self, queries: list) -> dict:
"""複数クエリの並列実行"""
tasks = []
for query_config in queries:
if query_config['type'] == 'database':
task = self._execute_db_query(query_config)
elif query_config['type'] == 'cache':
task = self._get_from_cache(query_config)
elif query_config['type'] == 'api':
task = self._fetch_from_api(query_config)
tasks.append(task)
# 全てのクエリを並列実行
results = await asyncio.gather(*tasks, return_exceptions=True)
# 結果の整理
formatted_results = {}
for i, result in enumerate(results):
query_name = queries[i]['name']
if isinstance(result, Exception):
formatted_results[query_name] = {"error": str(result)}
else:
formatted_results[query_name] = result
return formatted_results
async def _execute_db_query(self, config: dict) -> dict:
"""データベースクエリの非同期実行"""
async with self.db_pool.acquire() as conn:
result = await conn.fetch(
config['sql'],
*config.get('params', [])
)
return [dict(row) for row in result]
async def _get_from_cache(self, config: dict) -> dict:
"""キャッシュからの非同期取得"""
redis = aioredis.Redis(connection_pool=self.redis_pool)
cached_data = await redis.get(config['key'])
if cached_data:
return json.loads(cached_data)
else:
return None
async def _fetch_from_api(self, config: dict) -> dict:
"""外部APIからの非同期取得"""
async with aiohttp.ClientSession() as session:
async with session.get(
config['url'],
params=config.get('params', {})
) as response:
return await response.json()
# MCPサーバーでの使用例
class AsyncMCPServer:
def __init__(self):
self.query_optimizer = AsyncQueryOptimizer()
async def call_tool(self, name: str, arguments: dict) -> list:
if name == "get_dashboard_data":
return await self._get_dashboard_data(arguments.get("user_id"))
async def _get_dashboard_data(self, user_id: str) -> list:
"""ダッシュボード用データの並列取得"""
queries = [
{
'name': 'user_profile',
'type': 'database',
'sql': 'SELECT * FROM users WHERE id = $1',
'params': [user_id]
},
{
'name': 'recent_activities',
'type': 'database',
'sql': '''SELECT * FROM activities
WHERE user_id = $1
ORDER BY created_at DESC LIMIT 10''',
'params': [user_id]
},
{
'name': 'cached_stats',
'type': 'cache',
'key': f'user_stats_{user_id}'
},
{
'name': 'external_data',
'type': 'api',
'url': 'https://api.example.com/user-data',
'params': {'user_id': user_id}
}
]
results = await self.query_optimizer.parallel_data_fetch(queries)
return [{
"type": "text",
"text": json.dumps({
"user_id": user_id,
"dashboard_data": results,
"fetched_at": datetime.now().isoformat()
}, indent=2)
}]
6. クエリパフォーマンス監視
6.1 実行時間の測定と分析
import time
import statistics
from collections import defaultdict, deque
from dataclasses import dataclass
from typing import Dict, List
@dataclass
class QueryMetrics:
execution_times: List[float]
cache_hit_rate: float
error_count: int
last_executed: datetime
class QueryPerformanceMonitor:
def __init__(self, max_history: int = 1000):
self.metrics: Dict[str, QueryMetrics] = defaultdict(
lambda: QueryMetrics(deque(maxlen=max_history), 0.0, 0, None)
)
self.cache_stats = {"hits": 0, "misses": 0}
def measure_query(self, query_name: str):
"""クエリ実行時間測定デコレータ"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
try:
result = func(*args, **kwargs)
execution_time = time.time() - start_time
# メトリクス更新
self.metrics[query_name].execution_times.append(execution_time)
self.metrics[query_name].last_executed = datetime.now()
# 遅いクエリの警告
if execution_time > 1.0:
logging.warning(
f"Slow query detected: {query_name} took {execution_time:.2f}s"
)
return result
except Exception as e:
self.metrics[query_name].error_count += 1
logging.error(f"Query {query_name} failed: {str(e)}")
raise
return wrapper
return decorator
def record_cache_hit(self, hit: bool):
"""キャッシュヒット率の記録"""
if hit:
self.cache_stats["hits"] += 1
else:
self.cache_stats["misses"] += 1
def get_performance_report(self) -> dict:
"""パフォーマンスレポートの生成"""
report = {
"cache_hit_rate": self._calculate_cache_hit_rate(),
"query_performance": {}
}
for query_name, metrics in self.metrics.items():
if metrics.execution_times:
times = list(metrics.execution_times)
report["query_performance"][query_name] = {
"avg_time": statistics.mean(times),
"median_time": statistics.median(times),
"p95_time": self._percentile(times, 95),
"max_time": max(times),
"total_executions": len(times),
"error_count": metrics.error_count,
"last_executed": metrics.last_executed.isoformat() if metrics.last_executed else None
}
return report
def _calculate_cache_hit_rate(self) -> float:
"""キャッシュヒット率の計算"""
total = self.cache_stats["hits"] + self.cache_stats["misses"]
if total == 0:
return 0.0
return self.cache_stats["hits"] / total * 100
def _percentile(self, data: list, percentile: int) -> float:
"""パーセンタイル値の計算"""
sorted_data = sorted(data)
index = int(len(sorted_data) * percentile / 100)
return sorted_data[min(index, len(sorted_data) - 1)]
def identify_bottlenecks(self) -> list:
"""ボトルネックの特定"""
bottlenecks = []
for query_name, metrics in self.metrics.items():
if not metrics.execution_times:
continue
times = list(metrics.execution_times)
avg_time = statistics.mean(times)
# 平均実行時間が1秒以上のクエリ
if avg_time > 1.0:
bottlenecks.append({
"query": query_name,
"avg_time": avg_time,
"executions": len(times),
"issue": "slow_execution"
})
# エラー率が5%以上のクエリ
error_rate = metrics.error_count / len(times) * 100
if error_rate > 5.0:
bottlenecks.append({
"query": query_name,
"error_rate": error_rate,
"issue": "high_error_rate"
})
return sorted(bottlenecks, key=lambda x: x.get("avg_time", 0), reverse=True)
7. 実践的な最適化チェックリスト
データ設計レベル
- リソースURIの適切な階層化と分割
- パラメータ化による動的フィルタリング
- データベースインデックスの最適化
- 不要なデータの除外(フィールド選択)
キャッシュレベル
- 多層キャッシュ戦略の実装
- 適切なTTL設定
- キャッシュ無効化戦略
- キャッシュヒット率の監視
クエリ実行レベル
- プリペアードステートメントの使用
- 並列処理による高速化
- 接続プールの最適化
- 実行計画の分析と改善
監視・運用レベル
- クエリ実行時間の計測
- ボトルネックの特定と対策
- アラート設定
- 定期的なパフォーマンス分析
まとめ
MCPデータクエリの最適化は、アプリケーションのレスポンシブ性とスケーラビリティに直結する重要な要素です。本記事で紹介したテクニックを組み合わせることで:
- 応答時間の大幅短縮(数秒 → 数百ミリ秒)
- サーバーリソースの効率的利用
- ユーザー体験の向上
- 運用コストの削減
を実現できます。
重要なのは、システムの特性とユーザーのアクセスパターンを理解し、段階的に最適化を進めることです。継続的な監視と改善により、常に高いパフォーマンスを維持しましょう。
注意: MCPはAnthropicが開発した比較的新しいプロトコルです。最新の情報については、公式ドキュメントを参照してください。