0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

MCPの活用や応用への考察 - MCPデータのクエリを最適化する:効率的な情報取得テクニック

Last updated at Posted at 2025-09-27

はじめに

Model Context Protocol(MCP)アプリケーションにおいて、データクエリの効率性は応答速度とユーザー体験に直結します。MCPサーバーが管理する大量のリソースから必要な情報を素早く取得するには、戦略的なクエリ最適化が不可欠です。

本記事では、MCPデータクエリのパフォーマンスを向上させる実践的なテクニックを、具体的な実装例とともに解説します。

1. MCPデータクエリの基礎理解

1.1 MCPのデータアクセスパターン

MCPでは、以下の方法でデータにアクセスします:

// リソース一覧の取得
const resources = await client.listResources();

// 特定リソースの読み取り
const content = await client.readResource("file://docs/api-spec.md");

// ツールの実行
const result = await client.callTool("search_database", {
  query: "user analytics",
  limit: 10
});

1.2 クエリパフォーマンスに影響する要因

  • データ量: リソースサイズとレコード数
  • クエリ複雑度: フィルタリング条件の数と種類
  • ネットワーク遅延: サーバー-クライアント間の通信
  • データソースの特性: ファイルシステム、データベース、API等

2. リソースクエリの最適化戦略

2.1 効率的なリソース設計

適切な粒度での分割

class OptimizedMCPServer:
    def list_resources(self) -> list:
        """効率的なリソース構造の設計例"""
        return [
            # ❌ 巨大すぎるリソース
            # {"uri": "database://all_data", "name": "Complete Database"}
            
            # ✅ 適切に分割されたリソース
            {
                "uri": "database://users/active",
                "name": "Active Users",
                "description": "Currently active user profiles",
                "mimeType": "application/json"
            },
            {
                "uri": "database://products/category/{category_id}",
                "name": "Products by Category",
                "description": "Products filtered by category ID",
                "mimeType": "application/json"
            },
            {
                "uri": "logs://app/recent/{hours}",
                "name": "Recent Application Logs",
                "description": "Application logs from last N hours",
                "mimeType": "text/plain"
            }
        ]

2.2 パラメータ化によるデータ絞り込み

import re
from datetime import datetime, timedelta
from urllib.parse import parse_qs, urlparse

class ParameterizedMCPServer:
    def read_resource(self, uri: str) -> dict:
        """URIパラメータに基づく動的データ取得"""
        
        # URIの解析
        parsed = urlparse(uri)
        path_parts = parsed.path.split('/')
        query_params = parse_qs(parsed.query)
        
        if 'logs' in path_parts:
            return self._get_filtered_logs(path_parts, query_params)
        elif 'users' in path_parts:
            return self._get_filtered_users(path_parts, query_params)
        elif 'products' in path_parts:
            return self._get_filtered_products(path_parts, query_params)
            
        return self._get_default_resource(uri)
    
    def _get_filtered_logs(self, path_parts: list, params: dict) -> dict:
        """ログデータの効率的取得"""
        
        # 時間範囲の指定
        hours = int(path_parts[-1]) if path_parts[-1].isdigit() else 24
        since = datetime.now() - timedelta(hours=hours)
        
        # レベルフィルタの適用
        level_filter = params.get('level', ['INFO'])[0].upper()
        
        # 最大件数の制限
        limit = min(int(params.get('limit', [1000])[0]), 10000)
        
        # 効率的なクエリ実行
        logs = self.log_repository.get_logs(
            since=since,
            level=level_filter,
            limit=limit
        )
        
        return {
            "contents": [{
                "type": "text",
                "text": f"Retrieved {len(logs)} log entries from {since.isoformat()}\n" +
                       "\n".join([self._format_log_entry(log) for log in logs])
            }]
        }
    
    def _get_filtered_users(self, path_parts: list, params: dict) -> dict:
        """ユーザーデータの効率的取得"""
        
        # フィルタ条件の抽出
        status = params.get('status', ['active'])[0]
        fields = params.get('fields', ['id', 'name', 'email'])
        limit = min(int(params.get('limit', [100])[0]), 1000)
        
        # インデックスを活用したクエリ
        users = self.user_repository.get_users(
            status=status,
            fields=fields,
            limit=limit
        )
        
        return {
            "contents": [{
                "type": "text", 
                "text": json.dumps({
                    "total": len(users),
                    "users": users
                }, indent=2)
            }]
        }

3. データベースクエリの最適化

3.1 インデックス戦略

-- MCPアクセスパターンに最適化されたインデックス

-- 時系列データ用インデックス
CREATE INDEX idx_logs_timestamp_level 
ON application_logs(timestamp DESC, log_level)
WHERE timestamp >= CURRENT_TIMESTAMP - INTERVAL '7 days';

-- ユーザー検索用複合インデックス
CREATE INDEX idx_users_status_name 
ON users(status, name) 
WHERE status IN ('active', 'premium');

-- 製品カテゴリ用インデックス
CREATE INDEX idx_products_category_price 
ON products(category_id, price DESC, created_at DESC)
WHERE is_active = true;

-- 全文検索用インデックス
CREATE INDEX idx_documents_fts 
ON documents 
USING GIN(to_tsvector('english', title || ' ' || content));

3.2 効率的なクエリパターン

class DatabaseOptimizedQueries:
    def __init__(self):
        self.connection_pool = self._create_pool()
    
    def search_documents(self, query: str, limit: int = 50) -> list:
        """全文検索の最適化例"""
        
        # プリペアードステートメント使用
        sql = """
        SELECT 
            id, title, summary,
            ts_rank(search_vector, plainto_tsquery($1)) as rank
        FROM documents 
        WHERE search_vector @@ plainto_tsquery($1)
            AND is_published = true
        ORDER BY rank DESC, updated_at DESC
        LIMIT $2
        """
        
        with self.connection_pool.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(sql, [query, limit])
            return cursor.fetchall()
    
    def get_user_analytics(self, user_ids: list, date_range: tuple) -> dict:
        """バッチクエリによる効率化"""
        
        # IN句を使用した効率的なバッチ取得
        placeholders = ','.join(['$' + str(i+3) for i in range(len(user_ids))])
        
        sql = f"""
        SELECT 
            user_id,
            COUNT(*) as action_count,
            MAX(created_at) as last_activity,
            AVG(session_duration) as avg_session_duration
        FROM user_activities 
        WHERE user_id IN ({placeholders})
            AND created_at BETWEEN $1 AND $2
        GROUP BY user_id
        """
        
        params = [date_range[0], date_range[1]] + user_ids
        
        with self.connection_pool.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(sql, params)
            results = cursor.fetchall()
            
        return {
            row['user_id']: {
                'action_count': row['action_count'],
                'last_activity': row['last_activity'].isoformat(),
                'avg_session_duration': float(row['avg_session_duration'] or 0)
            }
            for row in results
        }

4. キャッシング戦略

4.1 多層キャッシュシステム

import redis
import json
from functools import wraps
from typing import Optional, Any

class MultiLayerCache:
    def __init__(self):
        # L1: メモリキャッシュ(高速だが容量小)
        self.memory_cache = {}
        self.memory_cache_max_size = 1000
        
        # L2: Redisキャッシュ(中速で大容量)
        self.redis_client = redis.Redis(
            host='localhost', 
            port=6379, 
            decode_responses=True
        )
        
    def cache_query_result(self, cache_key: str, ttl: int = 300):
        """クエリ結果キャッシュデコレータ"""
        def decorator(func):
            @wraps(func)
            def wrapper(*args, **kwargs):
                # L1キャッシュチェック
                if cache_key in self.memory_cache:
                    return self.memory_cache[cache_key]
                
                # L2キャッシュチェック
                cached_result = self.redis_client.get(cache_key)
                if cached_result:
                    result = json.loads(cached_result)
                    self._update_memory_cache(cache_key, result)
                    return result
                
                # キャッシュミス:実際にクエリ実行
                result = func(*args, **kwargs)
                
                # 両方のキャッシュに保存
                self._save_to_cache(cache_key, result, ttl)
                
                return result
            return wrapper
        return decorator
    
    def _update_memory_cache(self, key: str, value: Any):
        """メモリキャッシュの更新(LRU)"""
        if len(self.memory_cache) >= self.memory_cache_max_size:
            # 最も古いエントリを削除
            oldest_key = next(iter(self.memory_cache))
            del self.memory_cache[oldest_key]
        
        self.memory_cache[key] = value
    
    def _save_to_cache(self, key: str, value: Any, ttl: int):
        """両方のキャッシュに保存"""
        # メモリキャッシュ
        self._update_memory_cache(key, value)
        
        # Redisキャッシュ
        self.redis_client.setex(
            key, 
            ttl, 
            json.dumps(value, default=str)
        )

# 使用例
class CachedMCPServer:
    def __init__(self):
        self.cache = MultiLayerCache()
        self.db = DatabaseOptimizedQueries()
    
    def call_tool(self, name: str, arguments: dict) -> list:
        if name == "search_products":
            return self._search_products_cached(
                arguments.get("category"),
                arguments.get("query", ""),
                arguments.get("limit", 20)
            )
    
    @MultiLayerCache.cache_query_result("products_search_{category}_{query}_{limit}", ttl=600)
    def _search_products_cached(self, category: str, query: str, limit: int) -> list:
        """キャッシュされた製品検索"""
        
        results = self.db.search_products(
            category=category,
            query=query, 
            limit=limit
        )
        
        return [{
            "type": "text",
            "text": json.dumps({
                "category": category,
                "query": query,
                "total_results": len(results),
                "products": results
            }, indent=2)
        }]

4.2 スマートキャッシュ無効化

class SmartCacheInvalidation:
    def __init__(self, cache: MultiLayerCache):
        self.cache = cache
        self.cache_dependencies = {}
    
    def register_dependency(self, cache_key: str, dependencies: list):
        """キャッシュキーと依存関係の登録"""
        for dep in dependencies:
            if dep not in self.cache_dependencies:
                self.cache_dependencies[dep] = set()
            self.cache_dependencies[dep].add(cache_key)
    
    def invalidate_by_dependency(self, dependency: str):
        """依存関係に基づくキャッシュ無効化"""
        if dependency in self.cache_dependencies:
            for cache_key in self.cache_dependencies[dependency]:
                self._invalidate_cache_key(cache_key)
    
    def _invalidate_cache_key(self, cache_key: str):
        """特定キーのキャッシュ無効化"""
        # メモリキャッシュから削除
        self.cache.memory_cache.pop(cache_key, None)
        
        # Redisキャッシュから削除
        self.cache.redis_client.delete(cache_key)

# データ更新時の自動無効化
class DataUpdateHandler:
    def __init__(self, cache_invalidator: SmartCacheInvalidation):
        self.cache_invalidator = cache_invalidator
    
    def update_product(self, product_id: str, updates: dict):
        """製品更新時のキャッシュ無効化"""
        
        # データベース更新
        self.db.update_product(product_id, updates)
        
        # 関連キャッシュを無効化
        self.cache_invalidator.invalidate_by_dependency(f"product_{product_id}")
        self.cache_invalidator.invalidate_by_dependency("product_list")
        
        if 'category_id' in updates:
            category_id = updates['category_id']
            self.cache_invalidator.invalidate_by_dependency(f"category_{category_id}")

5. 非同期処理によるクエリ並列化

5.1 並列データ取得

import asyncio
import aioredis
import asyncpg

class AsyncQueryOptimizer:
    def __init__(self):
        self.db_pool = None
        self.redis_pool = None
    
    async def initialize(self):
        """非同期コネクションプールの初期化"""
        self.db_pool = await asyncpg.create_pool(
            "postgresql://user:pass@localhost/mcpdb",
            min_size=5,
            max_size=20
        )
        self.redis_pool = aioredis.ConnectionPool.from_url("redis://localhost")
    
    async def parallel_data_fetch(self, queries: list) -> dict:
        """複数クエリの並列実行"""
        tasks = []
        
        for query_config in queries:
            if query_config['type'] == 'database':
                task = self._execute_db_query(query_config)
            elif query_config['type'] == 'cache':
                task = self._get_from_cache(query_config)
            elif query_config['type'] == 'api':
                task = self._fetch_from_api(query_config)
            
            tasks.append(task)
        
        # 全てのクエリを並列実行
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        # 結果の整理
        formatted_results = {}
        for i, result in enumerate(results):
            query_name = queries[i]['name']
            if isinstance(result, Exception):
                formatted_results[query_name] = {"error": str(result)}
            else:
                formatted_results[query_name] = result
        
        return formatted_results
    
    async def _execute_db_query(self, config: dict) -> dict:
        """データベースクエリの非同期実行"""
        async with self.db_pool.acquire() as conn:
            result = await conn.fetch(
                config['sql'], 
                *config.get('params', [])
            )
            return [dict(row) for row in result]
    
    async def _get_from_cache(self, config: dict) -> dict:
        """キャッシュからの非同期取得"""
        redis = aioredis.Redis(connection_pool=self.redis_pool)
        cached_data = await redis.get(config['key'])
        
        if cached_data:
            return json.loads(cached_data)
        else:
            return None
    
    async def _fetch_from_api(self, config: dict) -> dict:
        """外部APIからの非同期取得"""
        async with aiohttp.ClientSession() as session:
            async with session.get(
                config['url'], 
                params=config.get('params', {})
            ) as response:
                return await response.json()

# MCPサーバーでの使用例
class AsyncMCPServer:
    def __init__(self):
        self.query_optimizer = AsyncQueryOptimizer()
    
    async def call_tool(self, name: str, arguments: dict) -> list:
        if name == "get_dashboard_data":
            return await self._get_dashboard_data(arguments.get("user_id"))
    
    async def _get_dashboard_data(self, user_id: str) -> list:
        """ダッシュボード用データの並列取得"""
        
        queries = [
            {
                'name': 'user_profile',
                'type': 'database',
                'sql': 'SELECT * FROM users WHERE id = $1',
                'params': [user_id]
            },
            {
                'name': 'recent_activities',
                'type': 'database', 
                'sql': '''SELECT * FROM activities 
                         WHERE user_id = $1 
                         ORDER BY created_at DESC LIMIT 10''',
                'params': [user_id]
            },
            {
                'name': 'cached_stats',
                'type': 'cache',
                'key': f'user_stats_{user_id}'
            },
            {
                'name': 'external_data',
                'type': 'api',
                'url': 'https://api.example.com/user-data',
                'params': {'user_id': user_id}
            }
        ]
        
        results = await self.query_optimizer.parallel_data_fetch(queries)
        
        return [{
            "type": "text",
            "text": json.dumps({
                "user_id": user_id,
                "dashboard_data": results,
                "fetched_at": datetime.now().isoformat()
            }, indent=2)
        }]

6. クエリパフォーマンス監視

6.1 実行時間の測定と分析

import time
import statistics
from collections import defaultdict, deque
from dataclasses import dataclass
from typing import Dict, List

@dataclass
class QueryMetrics:
    execution_times: List[float]
    cache_hit_rate: float
    error_count: int
    last_executed: datetime

class QueryPerformanceMonitor:
    def __init__(self, max_history: int = 1000):
        self.metrics: Dict[str, QueryMetrics] = defaultdict(
            lambda: QueryMetrics(deque(maxlen=max_history), 0.0, 0, None)
        )
        self.cache_stats = {"hits": 0, "misses": 0}
    
    def measure_query(self, query_name: str):
        """クエリ実行時間測定デコレータ"""
        def decorator(func):
            @wraps(func)
            def wrapper(*args, **kwargs):
                start_time = time.time()
                
                try:
                    result = func(*args, **kwargs)
                    execution_time = time.time() - start_time
                    
                    # メトリクス更新
                    self.metrics[query_name].execution_times.append(execution_time)
                    self.metrics[query_name].last_executed = datetime.now()
                    
                    # 遅いクエリの警告
                    if execution_time > 1.0:
                        logging.warning(
                            f"Slow query detected: {query_name} took {execution_time:.2f}s"
                        )
                    
                    return result
                    
                except Exception as e:
                    self.metrics[query_name].error_count += 1
                    logging.error(f"Query {query_name} failed: {str(e)}")
                    raise
                    
            return wrapper
        return decorator
    
    def record_cache_hit(self, hit: bool):
        """キャッシュヒット率の記録"""
        if hit:
            self.cache_stats["hits"] += 1
        else:
            self.cache_stats["misses"] += 1
    
    def get_performance_report(self) -> dict:
        """パフォーマンスレポートの生成"""
        report = {
            "cache_hit_rate": self._calculate_cache_hit_rate(),
            "query_performance": {}
        }
        
        for query_name, metrics in self.metrics.items():
            if metrics.execution_times:
                times = list(metrics.execution_times)
                report["query_performance"][query_name] = {
                    "avg_time": statistics.mean(times),
                    "median_time": statistics.median(times),
                    "p95_time": self._percentile(times, 95),
                    "max_time": max(times),
                    "total_executions": len(times),
                    "error_count": metrics.error_count,
                    "last_executed": metrics.last_executed.isoformat() if metrics.last_executed else None
                }
        
        return report
    
    def _calculate_cache_hit_rate(self) -> float:
        """キャッシュヒット率の計算"""
        total = self.cache_stats["hits"] + self.cache_stats["misses"]
        if total == 0:
            return 0.0
        return self.cache_stats["hits"] / total * 100
    
    def _percentile(self, data: list, percentile: int) -> float:
        """パーセンタイル値の計算"""
        sorted_data = sorted(data)
        index = int(len(sorted_data) * percentile / 100)
        return sorted_data[min(index, len(sorted_data) - 1)]
    
    def identify_bottlenecks(self) -> list:
        """ボトルネックの特定"""
        bottlenecks = []
        
        for query_name, metrics in self.metrics.items():
            if not metrics.execution_times:
                continue
                
            times = list(metrics.execution_times)
            avg_time = statistics.mean(times)
            
            # 平均実行時間が1秒以上のクエリ
            if avg_time > 1.0:
                bottlenecks.append({
                    "query": query_name,
                    "avg_time": avg_time,
                    "executions": len(times),
                    "issue": "slow_execution"
                })
            
            # エラー率が5%以上のクエリ
            error_rate = metrics.error_count / len(times) * 100
            if error_rate > 5.0:
                bottlenecks.append({
                    "query": query_name,
                    "error_rate": error_rate,
                    "issue": "high_error_rate"
                })
        
        return sorted(bottlenecks, key=lambda x: x.get("avg_time", 0), reverse=True)

7. 実践的な最適化チェックリスト

データ設計レベル

  • リソースURIの適切な階層化と分割
  • パラメータ化による動的フィルタリング
  • データベースインデックスの最適化
  • 不要なデータの除外(フィールド選択)

キャッシュレベル

  • 多層キャッシュ戦略の実装
  • 適切なTTL設定
  • キャッシュ無効化戦略
  • キャッシュヒット率の監視

クエリ実行レベル

  • プリペアードステートメントの使用
  • 並列処理による高速化
  • 接続プールの最適化
  • 実行計画の分析と改善

監視・運用レベル

  • クエリ実行時間の計測
  • ボトルネックの特定と対策
  • アラート設定
  • 定期的なパフォーマンス分析

まとめ

MCPデータクエリの最適化は、アプリケーションのレスポンシブ性とスケーラビリティに直結する重要な要素です。本記事で紹介したテクニックを組み合わせることで:

  • 応答時間の大幅短縮(数秒 → 数百ミリ秒)
  • サーバーリソースの効率的利用
  • ユーザー体験の向上
  • 運用コストの削減

を実現できます。

重要なのは、システムの特性とユーザーのアクセスパターンを理解し、段階的に最適化を進めることです。継続的な監視と改善により、常に高いパフォーマンスを維持しましょう。


注意: MCPはAnthropicが開発した比較的新しいプロトコルです。最新の情報については、公式ドキュメントを参照してください。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?