0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Model Context Protocol完全解説 30日間シリーズ - Day 24【MCP実戦 #24】パフォーマンス改善:大きなファイルや多数のリクエストへの対応

Posted at

はじめに

この記事は、QiitaのModel Context Protocol(以下、MCP)解説シリーズの第24回です。
今回は、MCPサーバーを本番環境で安定して運用するためのパフォーマンス改善に焦点を当てます。特に、Claudeからの大きなデータへのアクセス頻繁なリクエストに効率的に対応する手法を解説します。

🚀 なぜパフォーマンス改善が必要なのか?

MCPサーバーは、Claudeとローカルリソースや外部サービスを繋ぐ重要な役割を果たします。しかし、不適切な実装により以下のような問題が発生することがあります:

MCPサーバー特有のパフォーマンス課題

  1. メモリ枯渇

    • 大きなファイルやデータセットを一度に処理すると、MCPサーバープロセスがメモリ不足でクラッシュ
    • 例:1GB のログファイル全体をメモリに読み込んでしまう
  2. 応答遅延

    • 重い処理(データベースクエリ、外部API呼び出し)がClaude の応答を大幅に遅延
    • 例:数万件のレコードを検索する SQL クエリが30秒かかる
  3. リソースの無駄遣い

    • 同じデータへの重複アクセス
    • 不必要に詳細すぎるデータの取得
  4. プロセス間通信のボトルネック

    • stdio を通じた大量のデータ転送によるパフォーマンス低下

🧠 パフォーマンス改善の4つの戦略

1. データの分割とページネーション(Data Chunking & Pagination)

大きなデータセットは適切な単位に分割して処理します。

import { Server } from '@modelcontextprotocol/sdk/server/index.js';
import { 
  CallToolRequestSchema, 
  ListToolsRequestSchema 
} from '@modelcontextprotocol/sdk/types.js';

interface SearchResult {
  items: any[];
  totalCount: number;
  hasMore: boolean;
  nextCursor?: string;
}

class PaginatedDataProvider {
  private readonly PAGE_SIZE = 50; // 一度に返すレコード数を制限

  async searchDatabase(
    query: string, 
    cursor?: string, 
    limit: number = this.PAGE_SIZE
  ): Promise<SearchResult> {
    // カーソルベースのページネーション実装
    const offset = cursor ? parseInt(cursor) : 0;
    const actualLimit = Math.min(limit, this.PAGE_SIZE); // 上限を設定

    try {
      // データベースクエリ(例:SQLite)
      const results = await this.executeQuery(`
        SELECT * FROM documents 
        WHERE content LIKE ? 
        ORDER BY id 
        LIMIT ? OFFSET ?
      `, [`%${query}%`, actualLimit + 1, offset]);

      const hasMore = results.length > actualLimit;
      const items = hasMore ? results.slice(0, -1) : results;
      
      return {
        items,
        totalCount: await this.getSearchTotalCount(query),
        hasMore,
        nextCursor: hasMore ? (offset + actualLimit).toString() : undefined
      };
    } catch (error) {
      console.error('Database search error:', error);
      throw error;
    }
  }

  private async getSearchTotalCount(query: string): Promise<number> {
    const result = await this.executeQuery(`
      SELECT COUNT(*) as count 
      FROM documents 
      WHERE content LIKE ?
    `, [`%${query}%`]);
    return result[0].count;
  }
}

const dataProvider = new PaginatedDataProvider();

server.setRequestHandler(CallToolRequestSchema, async (request) => {
  const { name, arguments: args } = request.params;

  if (name === "search_documents") {
    const { query, cursor, limit } = args;
    const result = await dataProvider.searchDatabase(query, cursor, limit);
    
    return {
      content: [{
        type: "text",
        text: JSON.stringify({
          results: result.items,
          pagination: {
            total: result.totalCount,
            hasMore: result.hasMore,
            nextCursor: result.nextCursor
          }
        }, null, 2)
      }]
    };
  }
});

2. ファイルストリーミングとバッファリング(File Streaming & Buffering)

大きなファイルは部分的に読み込んで処理します。

import * as fs from 'fs';
import * as readline from 'readline';
import { pipeline } from 'stream/promises';

class FileProcessor {
  private readonly CHUNK_SIZE = 64 * 1024; // 64KB チャンク
  private readonly MAX_LINES_PER_REQUEST = 100;

  async readFileInChunks(
    filePath: string, 
    startLine: number = 0, 
    maxLines: number = this.MAX_LINES_PER_REQUEST
  ): Promise<{ lines: string[], hasMore: boolean, nextStartLine: number }> {
    const lines: string[] = [];
    let currentLine = 0;
    let hasMore = false;

    try {
      const fileStream = fs.createReadStream(filePath, { encoding: 'utf8' });
      const rl = readline.createInterface({
        input: fileStream,
        crlfDelay: Infinity
      });

      for await (const line of rl) {
        if (currentLine >= startLine) {
          if (lines.length >= maxLines) {
            hasMore = true;
            break;
          }
          lines.push(line);
        }
        currentLine++;
      }

      return {
        lines,
        hasMore,
        nextStartLine: startLine + lines.length
      };
    } catch (error) {
      if (error.code === 'ENOENT') {
        throw new Error(`File not found: ${filePath}`);
      }
      throw error;
    }
  }

  async searchInFile(
    filePath: string, 
    searchTerm: string, 
    maxResults: number = 50
  ): Promise<{ matches: Array<{ line: string, lineNumber: number }>, truncated: boolean }> {
    const matches: Array<{ line: string, lineNumber: number }> = [];
    let lineNumber = 0;
    let truncated = false;

    try {
      const fileStream = fs.createReadStream(filePath, { encoding: 'utf8' });
      const rl = readline.createInterface({
        input: fileStream,
        crlfDelay: Infinity
      });

      for await (const line of rl) {
        lineNumber++;
        
        if (line.toLowerCase().includes(searchTerm.toLowerCase())) {
          if (matches.length >= maxResults) {
            truncated = true;
            break;
          }
          
          matches.push({
            line: line.trim(),
            lineNumber
          });
        }
      }

      return { matches, truncated };
    } catch (error) {
      console.error(`Error searching in file ${filePath}:`, error);
      throw error;
    }
  }
}

const fileProcessor = new FileProcessor();

server.setRequestHandler(CallToolRequestSchema, async (request) => {
  const { name, arguments: args } = request.params;

  if (name === "read_large_file") {
    const { filePath, startLine, maxLines } = args;
    
    const result = await fileProcessor.readFileInChunks(
      filePath, 
      startLine, 
      maxLines
    );
    
    return {
      content: [{
        type: "text",
        text: `File: ${filePath}\nLines ${startLine}-${startLine + result.lines.length - 1}:\n\n${result.lines.join('\n')}\n\n${result.hasMore ? `Has more content. Use startLine: ${result.nextStartLine}` : 'End of file reached.'}`
      }]
    };
  }

  if (name === "search_in_file") {
    const { filePath, searchTerm, maxResults } = args;
    
    const result = await fileProcessor.searchInFile(
      filePath, 
      searchTerm, 
      maxResults
    );
    
    return {
      content: [{
        type: "text",
        text: `Search results for "${searchTerm}" in ${filePath}:\n\n${result.matches.map(match => `Line ${match.lineNumber}: ${match.line}`).join('\n')}\n\n${result.truncated ? `Results truncated at ${maxResults} matches.` : `Found ${result.matches.length} matches.`}`
      }]
    };
  }
});

3. インテリジェントなキャッシング(Intelligent Caching)

頻繁にアクセスされるデータや計算結果をキャッシュします。

import { LRUCache } from 'lru-cache';
import * as crypto from 'crypto';

interface CacheEntry<T> {
  data: T;
  timestamp: number;
  ttl?: number;
}

class SmartCache {
  private memoryCache = new LRUCache<string, CacheEntry<any>>({ 
    max: 1000,
    maxSize: 100 * 1024 * 1024, // 100MB
    sizeCalculation: (value) => JSON.stringify(value).length
  });

  private persistentCache = new Map<string, CacheEntry<any>>();

  private generateKey(prefix: string, ...args: any[]): string {
    const data = JSON.stringify(args);
    const hash = crypto.createHash('md5').update(data).digest('hex');
    return `${prefix}:${hash}`;
  }

  async get<T>(
    key: string, 
    factory: () => Promise<T>, 
    ttlSeconds: number = 300
  ): Promise<T> {
    // メモリキャッシュをチェック
    const cached = this.memoryCache.get(key);
    if (cached && !this.isExpired(cached)) {
      console.log(`Cache hit: ${key}`);
      return cached.data;
    }

    // 永続キャッシュをチェック
    const persistent = this.persistentCache.get(key);
    if (persistent && !this.isExpired(persistent)) {
      console.log(`Persistent cache hit: ${key}`);
      // メモリキャッシュにも保存
      this.memoryCache.set(key, persistent);
      return persistent.data;
    }

    // キャッシュミス:新しいデータを取得
    console.log(`Cache miss: ${key}`);
    try {
      const data = await factory();
      const entry: CacheEntry<T> = {
        data,
        timestamp: Date.now(),
        ttl: ttlSeconds * 1000
      };

      this.memoryCache.set(key, entry);
      
      // 重要なデータは永続キャッシュにも保存
      if (ttlSeconds > 60) {
        this.persistentCache.set(key, entry);
      }

      return data;
    } catch (error) {
      console.error(`Error generating cache data for ${key}:`, error);
      throw error;
    }
  }

  private isExpired(entry: CacheEntry<any>): boolean {
    if (!entry.ttl) return false;
    return Date.now() - entry.timestamp > entry.ttl;
  }

  invalidate(pattern: string): void {
    // パターンに一致するキーを削除
    const regex = new RegExp(pattern);
    
    for (const key of this.memoryCache.keys()) {
      if (regex.test(key)) {
        this.memoryCache.delete(key);
      }
    }
    
    for (const key of this.persistentCache.keys()) {
      if (regex.test(key)) {
        this.persistentCache.delete(key);
      }
    }
  }

  getStats() {
    return {
      memoryCache: {
        size: this.memoryCache.size,
        calculatedSize: this.memoryCache.calculatedSize
      },
      persistentCache: {
        size: this.persistentCache.size
      }
    };
  }
}

const cache = new SmartCache();

// 使用例
server.setRequestHandler(CallToolRequestSchema, async (request) => {
  const { name, arguments: args } = request.params;

  if (name === "get_weather") {
    const { city } = args;
    const cacheKey = cache.generateKey('weather', city);
    
    const weatherData = await cache.get(
      cacheKey,
      async () => {
        // 実際のWeather API呼び出し
        const response = await fetch(`https://api.weather.com/v1/current?q=${city}`);
        return await response.json();
      },
      300 // 5分間キャッシュ
    );

    return {
      content: [{
        type: "text",
        text: JSON.stringify(weatherData, null, 2)
      }]
    };
  }

  if (name === "analyze_data") {
    const { dataId } = args;
    const cacheKey = cache.generateKey('analysis', dataId);
    
    const analysis = await cache.get(
      cacheKey,
      async () => {
        // 重い分析処理
        console.log(`Performing heavy analysis for ${dataId}`);
        await new Promise(resolve => setTimeout(resolve, 5000)); // 5秒の模擬処理
        return {
          result: "Complex analysis completed",
          timestamp: new Date().toISOString(),
          dataId
        };
      },
      1800 // 30分間キャッシュ
    );

    return {
      content: [{
        type: "text",
        text: JSON.stringify(analysis, null, 2)
      }]
    };
  }
});

4. 非同期処理とリソース管理(Async Processing & Resource Management)

効率的な非同期処理とリソースの管理を実装します。

import { Worker } from 'worker_threads';
import pLimit from 'p-limit';

class ResourceManager {
  private concurrencyLimit = pLimit(10); // 同時実行数を制限
  private workerPool: Worker[] = [];
  private readonly maxWorkers = 4;

  constructor() {
    // ワーカープールの初期化
    this.initializeWorkerPool();
  }

  private initializeWorkerPool() {
    for (let i = 0; i < this.maxWorkers; i++) {
      // 実際の実装では、worker.jsファイルが必要
      // const worker = new Worker('./worker.js');
      // this.workerPool.push(worker);
    }
  }

  async processWithConcurrencyLimit<T>(
    tasks: (() => Promise<T>)[]
  ): Promise<T[]> {
    // 同時実行数を制限して処理
    const limitedTasks = tasks.map(task => 
      this.concurrencyLimit(task)
    );
    
    return Promise.all(limitedTasks);
  }

  async processLargeDataset<T, R>(
    data: T[], 
    processor: (item: T) => Promise<R>,
    batchSize: number = 10
  ): Promise<R[]> {
    const results: R[] = [];
    
    // データをバッチごとに処理
    for (let i = 0; i < data.length; i += batchSize) {
      const batch = data.slice(i, i + batchSize);
      const batchTasks = batch.map(item => processor(item));
      
      const batchResults = await this.processWithConcurrencyLimit(batchTasks);
      results.push(...batchResults);
      
      // 小さな遅延を入れてCPU負荷を制御
      if (i + batchSize < data.length) {
        await new Promise(resolve => setTimeout(resolve, 10));
      }
    }
    
    return results;
  }

  // メモリ使用量の監視
  monitorMemoryUsage() {
    const memUsage = process.memoryUsage();
    const memoryInfo = {
      rss: Math.round(memUsage.rss / 1024 / 1024), // MB
      heapUsed: Math.round(memUsage.heapUsed / 1024 / 1024), // MB
      heapTotal: Math.round(memUsage.heapTotal / 1024 / 1024), // MB
      external: Math.round(memUsage.external / 1024 / 1024) // MB
    };

    // メモリ使用量が閾値を超えた場合の警告
    if (memoryInfo.heapUsed > 512) { // 512MB を超えた場合
      console.warn('High memory usage detected:', memoryInfo);
      
      // ガベージコレクションを強制実行
      if (global.gc) {
        global.gc();
        console.log('Garbage collection executed');
      }
    }

    return memoryInfo;
  }

  async cleanup() {
    // ワーカープールのクリーンアップ
    await Promise.all(
      this.workerPool.map(worker => worker.terminate())
    );
  }
}

const resourceManager = new ResourceManager();

// メモリ監視の定期実行
setInterval(() => {
  resourceManager.monitorMemoryUsage();
}, 30000); // 30秒ごと

server.setRequestHandler(CallToolRequestSchema, async (request) => {
  const { name, arguments: args } = request.params;

  if (name === "process_large_dataset") {
    const { dataIds } = args;
    
    const results = await resourceManager.processLargeDataset(
      dataIds,
      async (id: string) => {
        // 個別のデータ処理
        const data = await fetchDataById(id);
        return processData(data);
      },
      5 // バッチサイズ
    );

    return {
      content: [{
        type: "text",
        text: `Processed ${results.length} items successfully`
      }]
    };
  }

  if (name === "parallel_api_calls") {
    const { endpoints } = args;
    
    const tasks = endpoints.map((endpoint: string) => 
      async () => {
        const response = await fetch(endpoint);
        return {
          endpoint,
          data: await response.json(),
          status: response.status
        };
      }
    );

    const results = await resourceManager.processWithConcurrencyLimit(tasks);
    
    return {
      content: [{
        type: "text",
        text: JSON.stringify(results, null, 2)
      }]
    };
  }
});

// ダミー関数(実際の実装に置き換える)
async function fetchDataById(id: string): Promise<any> {
  // データベースやAPIからデータを取得
  return { id, data: `Data for ${id}` };
}

async function processData(data: any): Promise<any> {
  // データ処理ロジック
  return { processed: true, ...data };
}

📊 パフォーマンス監視とプロファイリング

実際のパフォーマンスを測定し、改善点を特定します。

class PerformanceMonitor {
  private metrics = new Map<string, number[]>();
  private counters = new Map<string, number>();

  startTimer(operation: string): () => void {
    const start = Date.now();
    
    return () => {
      const duration = Date.now() - start;
      this.recordMetric(operation, duration);
      return duration;
    };
  }

  recordMetric(name: string, value: number) {
    if (!this.metrics.has(name)) {
      this.metrics.set(name, []);
    }
    
    const values = this.metrics.get(name)!;
    values.push(value);
    
    // 最新1000件のみ保持
    if (values.length > 1000) {
      values.shift();
    }
  }

  incrementCounter(name: string) {
    const current = this.counters.get(name) || 0;
    this.counters.set(name, current + 1);
  }

  getMetrics() {
    const result: Record<string, any> = {};
    
    for (const [name, values] of this.metrics) {
      if (values.length > 0) {
        const sorted = [...values].sort((a, b) => a - b);
        result[name] = {
          count: values.length,
          average: values.reduce((a, b) => a + b, 0) / values.length,
          median: sorted[Math.floor(sorted.length / 2)],
          p95: sorted[Math.floor(sorted.length * 0.95)],
          min: Math.min(...values),
          max: Math.max(...values)
        };
      }
    }
    
    result.counters = Object.fromEntries(this.counters);
    result.memory = process.memoryUsage();
    
    return result;
  }

  reset() {
    this.metrics.clear();
    this.counters.clear();
  }
}

const monitor = new PerformanceMonitor();

// ツール実行の監視
const originalCallHandler = server.setRequestHandler;
server.setRequestHandler = function(schema: any, handler: any) {
  const wrappedHandler = async (request: any) => {
    const toolName = request.params.name;
    const endTimer = monitor.startTimer(`tool_${toolName}`);
    
    try {
      monitor.incrementCounter('total_requests');
      monitor.incrementCounter(`tool_${toolName}_requests`);
      
      const result = await handler(request);
      
      monitor.incrementCounter('successful_requests');
      endTimer();
      
      return result;
    } catch (error) {
      monitor.incrementCounter('failed_requests');
      endTimer();
      throw error;
    }
  };
  
  return originalCallHandler.call(this, schema, wrappedHandler);
};

// メトリクスレポート用のツール
server.setRequestHandler(CallToolRequestSchema, async (request) => {
  const { name } = request.params;

  if (name === "get_performance_metrics") {
    const metrics = monitor.getMetrics();
    
    return {
      content: [{
        type: "text",
        text: JSON.stringify(metrics, null, 2)
      }]
    };
  }
});

🎯 まとめ:効率的なMCPサーバーの実現

パフォーマンス改善の4つの柱

  1. データ分割: 大きなデータセットを適切な単位に分割して処理
  2. ストリーミング: ファイルやデータを部分的に読み込み、メモリ効率を向上
  3. キャッシング: 頻繁にアクセスされるデータを効果的にキャッシュ
  4. リソース管理: 同時実行数の制限とメモリ監視

実装時のベストプラクティス

  • 段階的な最適化: 最初は動作することを重視し、その後ボトルネックを特定して改善
  • メトリクス駆動: 実際の測定結果に基づいて最適化の優先度を決定
  • 適切な閾値設定: メモリ使用量、同時実行数、キャッシュサイズの適切な制限
  • エラーハンドリング: パフォーマンス最適化がエラーの原因にならないよう注意

これらの手法を適用することで、MCPサーバーは大規模なデータや多数のリクエストに対しても安定したパフォーマンスを維持できるようになります。

次回は、複数サービス統合をテーマに、GitHubとSlackを連携させる実践的なMCP実装例を解説します。お楽しみに!

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?