2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Agent-to-Agent通信とMCPを学ぶマルチエージェントシステム開発 - 03_MCPの実装について(詳細)

2
Last updated at Posted at 2025-12-16

Model Context Protocol(MCP)でデータソース統合を実現

はじめに

前回のアーキテクチャ編では、Agent-to-Agent(A2A)通信の実装方法を詳しく解説しました。今回は、Model Context Protocol(MCP) の具体的な実装について深掘りします。MCPは、AIエージェントと外部データソースを効率的に接続するための標準プロトコルで、マルチエージェントシステムにおけるデータアクセス層の統一化を実現します。

🔄 マルチエージェント協調におけるMCPの位置づけ

MCPは単なるデータアクセスプロトコルではなく、マルチエージェントシステムの情報基盤として重要な役割を果たします:

📊 マルチエージェントシステムにおけるMCPの役割

User Query
    ↓
Coordinator Agent(A2A通信による調整)
    ↓ ↙ ↘
Agent A   Agent B   Agent C(各専門エージェント)
    ↓       ↓       ↓
    MCP     MCP     MCP(統一データアクセス)
    ↓       ↓       ↓
Database A  API B  Service C(外部データソース)

MCPによって実現される価値:

  • データアクセスの統一化: 全エージェントが同じ方法でデータにアクセス
  • A2A通信での情報整合性: エージェント間でのデータ共有時の一貫性保証
  • リアルタイム協調の効率化: データ取得の高速化による協調処理の最適化

🎯 MCPが解決する課題

従来のデータアクセスの問題

# ❌ 各エージェントが個別にデータアクセス実装
class CustomerAgent:
    def __init__(self):
        # データベース接続設定
        self.db_config = {
            "host": "localhost",
            "user": "app_user",
            "password": "secret",
            "database": "customer_db"
        }
        
    async def get_customer_info(self, customer_id: str):
        # 個別のデータベース接続
        conn = await asyncpg.connect(**self.db_config)
        try:
            result = await conn.fetchrow(
                "SELECT * FROM customers WHERE id = $1", customer_id
            )
            return dict(result) if result else None
        finally:
            await conn.close()

class CityAgent:
    def __init__(self):
        # 別のAPI接続設定
        self.api_config = {
            "base_url": "https://city-api.example.com",
            "api_key": "city_api_key"
        }
    
    async def get_traffic_data(self, location: str):
        # 個別のAPI接続
        async with aiohttp.ClientSession() as session:
            headers = {"Authorization": f"Bearer {self.api_config['api_key']}"}
            async with session.get(
                f"{self.api_config['base_url']}/traffic/{location}",
                headers=headers
            ) as response:
                return await response.json()

問題点:

  • 🔄 コードの重複: 各エージェントで類似のデータアクセスコード
  • 🔧 保守性の低下: データソース変更時の影響範囲が大
  • 🔐 セキュリティリスク: 認証情報の分散管理
  • 📊 非統一: データ形式やエラーハンドリングの不統一

MCPによる解決

# ✅ MCP経由でのデータアクセス
class CustomerAgent(BaseAgent):
    def __init__(self):
        super().__init__("customer_agent", "Customer Support Agent")
        self.mcp_client = MCPClient()
    
    async def get_customer_info(self, customer_id: str):
        # 統一されたインターフェース
        response = await self.mcp_client.query_data_source(
            source="customer_database",
            action="get_customer",
            parameters={"customer_id": customer_id}
        )
        return response.data

class CityAgent(BaseAgent):
    def __init__(self):
        super().__init__("city_agent", "Smart City Management Agent")
        self.mcp_client = MCPClient()
    
    async def get_traffic_data(self, location: str):
        # 同じインターフェース
        response = await self.mcp_client.query_data_source(
            source="traffic_api",
            action="get_traffic_status",
            parameters={"location": location}
        )
        return response.data

🧠 MCP + GPT-4o基盤推論システム統合 (NEW)

従来のMCPによるデータアクセス統一に加えて、GPT-4o基盤推論機能 による真のAI判断 を統合しました。

🔮 将来性: 現在はGPT-4oを使用していますが、GPT-5.1やOpenAI o1シリーズの専用推論モデルが利用可能になった際には、さらに高度な推論ベースのデータ選択が実現されます。

AI判断による動的データソース選択

# ✅ AI判断による最適データソース選択
class IntelligentMCPClient:
    """推論機能統合MCPクライアント"""
    
    def __init__(self):
        self.mcp_client = MCPClient()
        self.reasoning_engine = AdvancedReasoningAgent(
            "mcp_reasoning_agent",
            "MCP Reasoning Agent",
            ["data_analysis", "source_optimization"]
        )
    
    async def intelligent_data_query(self, user_query: str) -> Dict[str, Any]:
        """AI判断による最適データクエリ実行"""
        
        # GPT-4oによる推論分析(将来的にGPT-5.1/o1対応予定)
        reasoning_result = await self.reasoning_engine.execute_deep_reasoning(f"""
        ユーザークエリ: "{user_query}"
        
        以下の観点から最適なデータソースとクエリ戦略を分析してください:
        1. 必要な情報の種類と優先度
        2. 利用可能なデータソース: {list(self.available_sources.keys())}
        3. レスポンス時間とデータ品質のトレードオフ
        4. 複数ソース統合の必要性
        """)
        
        # 推論結果に基づく実行計画
        execution_plan = reasoning_result["recommended_solution"]["implementation_plan"]
        
        results = {}
        for phase in execution_plan:
            for action in phase["actions"]:
                if "query_source" in action:
                    source_result = await self._execute_data_query(
                        action["source"],
                        action["query_parameters"]
                    )
                    results[action["source"]] = source_result
        
        # AI統合による結果最適化
        integrated_result = await self._intelligent_result_integration(
            results, user_query, reasoning_result
        )
        
        return integrated_result
    
    async def _intelligent_result_integration(self, 
                                            source_results: Dict[str, Any],
                                            original_query: str,
                                            reasoning_context: Dict[str, Any]) -> Dict[str, Any]:
        """AI判断による結果統合"""
        
        integration_prompt = f"""
        元のクエリ: "{original_query}"
        収集データ: {json.dumps(source_results, ensure_ascii=False, indent=2)}
        推論コンテキスト: {reasoning_context}
        
        以下を実行してください:
        1. データ間の関連性分析
        2. 矛盾点の特定と調整
        3. ユーザーニーズに最適化した統合
        4. 信頼性評価と根拠提示
        """
        
        integration_result = await self.reasoning_engine.execute_deep_reasoning(
            integration_prompt, ReasoningMode.STRATEGIC
        )
        
        return {
            "integrated_data": integration_result["recommended_solution"],
            "data_sources": list(source_results.keys()),
            "confidence_score": integration_result["quality_assessment"]["confidence_score"],
            "reasoning_trace": integration_result["reasoning_steps"]
        }

推論駆動MCPサーバー

# src/mcp/servers/intelligent_knowledge_server.py
class IntelligentKnowledgeServer(MCPServer):
    """推論機能統合ナレッジサーバー"""
    
    async def handle_intelligent_query(self, query_data: Dict[str, Any]) -> Dict[str, Any]:
        """AI推論による知識統合クエリ処理"""
        
        query = query_data["query"]
        context = query_data.get("context", {})
        
        # マルチエージェント協調推論実行
        reasoning_coordinator = MultiAgentReasoningCoordinator()
        
        reasoning_task = ReasoningTask(
            task_id=f"knowledge_query_{uuid.uuid4().hex[:8]}",
            description=f"ナレッジベース統合クエリ: {query}",
            complexity=self._assess_query_complexity(query),
            required_expertise=["knowledge_management", "information_retrieval"],
            constraints={"response_time": "fast", "accuracy": "high"},
            success_criteria={"relevance": 0.9, "completeness": 0.85},
            priority=3
        )
        
        # 協調推論による最適回答生成
        reasoning_result = await reasoning_coordinator.execute_multi_agent_reasoning(
            reasoning_task=reasoning_task,
            orchestration_strategy=OrchestrationStrategy.ADAPTIVE
        )
        
        return {
            "answer": reasoning_result["final_synthesis"]["integrated_solution"]["final_recommendation"],
            "reasoning_process": reasoning_result["reasoning_results"],
            "confidence": reasoning_result["final_synthesis"]["collaboration_quality"]["synthesis_confidence"],
            "sources": self._extract_knowledge_sources(reasoning_result),
            "session_id": reasoning_result["session_id"]
        }

推論統合MCPの利点:

  1. 🎯 動的最適化: AIがクエリに応じて最適なデータソースを選択
  2. 🧠 知識統合: 複数ソースからの情報を推論により統合
  3. 📊 品質保証: AI評価による結果の信頼性判定
  4. 🔄 学習改善: 推論履歴からの継続的システム改善

🏗️ MCP アーキテクチャ

全体構成

┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│   Customer      │    │   Smart City    │    │   Enterprise    │
│     Agent       │    │     Agent       │    │     Agent       │
└─────────┬───────┘    └─────────┬───────┘    └─────────┬───────┘
          │                      │                      │
          └──────────┬───────────┬──────────────────────┘
                     │           │
              ┌──────▼───────────▼──────┐
              │     MCP Client Layer    │
              └──────┬───────────┬──────┘
                     │           │
        ┌────────────▼──┐   ┌────▼────────────┐
        │  WebSocket    │   │   WebSocket     │
        │ Connection    │   │  Connection     │
        └────────────┬──┘   └────┬────────────┘
                     │           │
     ┌───────────────▼──┐   ┌────▼──────────────┐   ┌─────────────────┐
     │ Customer Data    │   │ City Data         │   │ Enterprise Data │
     │   MCP Server     │   │  MCP Server       │   │   MCP Server    │
     └───────┬──────────┘   └────┬──────────────┘   └─────────┬───────┘
             │                   │                            │
     ┌───────▼──────────┐   ┌────▼──────────────┐   ┌─────────▼───────┐
     │   Customer       │   │   Traffic API     │   │   HR Database   │
     │   Database       │   │   Energy API      │   │ Finance Database│
     └──────────────────┘   └───────────────────┘   └─────────────────┘

MCPプロトコルの基本構造

# src/mcp/protocol.py
from abc import ABC, abstractmethod
from typing import Dict, Any, List, Optional, Union
from pydantic import BaseModel
import websockets
import json
import asyncio

class MCPMessage(BaseModel):
    """MCP基本メッセージ形式"""
    jsonrpc: str = "2.0"
    id: Optional[Union[str, int]] = None
    method: Optional[str] = None
    params: Optional[Dict[str, Any]] = None
    result: Optional[Dict[str, Any]] = None
    error: Optional[Dict[str, Any]] = None

class MCPResource(BaseModel):
    """MCPリソース定義"""
    uri: str
    name: str
    description: Optional[str] = None
    mimeType: Optional[str] = None

class MCPTool(BaseModel):
    """MCPツール定義"""
    name: str
    description: str
    inputSchema: Dict[str, Any]

class MCPServer(ABC):
    """MCP Server 基底クラス"""
    
    def __init__(self, name: str, version: str = "1.0.0"):
        self.name = name
        self.version = version
        self.resources: List[MCPResource] = []
        self.tools: List[MCPTool] = []
        self.clients = set()
    
    @abstractmethod
    async def handle_list_resources(self) -> List[MCPResource]:
        """利用可能なリソース一覧を返す"""
        # 各サーバーで具体的なリソースを定義
        # 例: データベーステーブル、APIエンドポイント等
        return [
            MCPResource(
                uri="mcp://example/resource",
                name="サンプルリソース",
                description="実装例用リソース",
                mimeType="application/json"
            )
        ]
    
    @abstractmethod
    async def handle_read_resource(self, uri: str) -> Dict[str, Any]:
        """指定されたリソースを読み取る"""
        # URIに基づいてリソースを取得
        if uri == "mcp://example/resource":
            return {
                "content": {
                    "data": "サンプルデータ",
                    "timestamp": datetime.now().isoformat()
                },
                "mimeType": "application/json"
            }
        else:
            raise ValueError(f"リソースが見つかりません: {uri}")
    
    @abstractmethod
    async def handle_call_tool(self, name: str, arguments: Dict[str, Any]) -> Dict[str, Any]:
        """指定されたツールを実行する"""
        # ツール名に基づいた処理を実行
        if name == "example_tool":
            return {
                "result": f"ツール実行結果: {arguments}",
                "status": "success",
                "timestamp": datetime.now().isoformat()
            }
        else:
            raise ValueError(f"未対応のツール: {name}")
    
    async def start_server(self, host: str = "localhost", port: int = 8765):
        """WebSocketサーバー開始"""
        print(f"Starting MCP Server '{self.name}' on {host}:{port}")
        await websockets.serve(self.handle_client, host, port)
    
    async def handle_client(self, websocket, path):
        """クライアント接続処理"""
        self.clients.add(websocket)
        try:
            await self.client_session(websocket)
        finally:
            self.clients.remove(websocket)
    
    async def client_session(self, websocket):
        """クライアントセッション管理"""
        async for message in websocket:
            try:
                data = json.loads(message)
                mcp_message = MCPMessage(**data)
                response = await self.process_message(mcp_message)
                
                if response:
                    await websocket.send(json.dumps(response.dict(exclude_none=True)))
                    
            except Exception as e:
                error_response = MCPMessage(
                    id=data.get("id") if "data" in locals() else None,
                    error={
                        "code": -32603,
                        "message": "Internal error",
                        "data": str(e)
                    }
                )
                await websocket.send(json.dumps(error_response.dict(exclude_none=True)))
    
    async def process_message(self, message: MCPMessage) -> Optional[MCPMessage]:
        """メッセージ処理のルーティング"""
        if message.method == "initialize":
            return await self.handle_initialize(message)
        elif message.method == "resources/list":
            resources = await self.handle_list_resources()
            return MCPMessage(
                id=message.id,
                result={"resources": [r.dict() for r in resources]}
            )
        elif message.method == "resources/read":
            uri = message.params.get("uri")
            result = await self.handle_read_resource(uri)
            return MCPMessage(
                id=message.id,
                result=result
            )
        elif message.method == "tools/list":
            return MCPMessage(
                id=message.id,
                result={"tools": [t.dict() for t in self.tools]}
            )
        elif message.method == "tools/call":
            name = message.params.get("name")
            arguments = message.params.get("arguments", {})
            result = await self.handle_call_tool(name, arguments)
            return MCPMessage(
                id=message.id,
                result=result
            )
        else:
            return MCPMessage(
                id=message.id,
                error={
                    "code": -32601,
                    "message": f"Method not found: {message.method}"
                }
            )
    
    async def handle_initialize(self, message: MCPMessage) -> MCPMessage:
        """初期化処理"""
        return MCPMessage(
            id=message.id,
            result={
                "serverInfo": {
                    "name": self.name,
                    "version": self.version
                },
                "capabilities": {
                    "resources": {"subscribe": True, "listChanged": True},
                    "tools": {"listChanged": True}
                }
            }
        )

🗃️ 具体的なMCPサーバー実装

1. カスタマーデータMCPサーバー

# src/mcp/servers/customer_server.py
class CustomerDataServer(MCPServer):
    """顧客データ専用MCPサーバー"""
    
    def __init__(self):
        super().__init__("customer-data-server", "1.0.0")
        self.customer_database = {}  # 実際にはデータベース接続
        self.init_resources_and_tools()
        self.init_sample_data()
    
    def init_resources_and_tools(self):
        """リソースとツールの定義"""
        # 利用可能なリソース
        self.resources = [
            MCPResource(
                uri="customer://profiles",
                name="Customer Profiles",
                description="Customer profile information and history",
                mimeType="application/json"
            ),
            MCPResource(
                uri="customer://tickets",
                name="Support Tickets",
                description="Customer support ticket database",
                mimeType="application/json"
            ),
            MCPResource(
                uri="customer://analytics",
                name="Customer Analytics",
                description="Customer behavior analytics and insights",
                mimeType="application/json"
            )
        ]
        
        # 利用可能なツール
        self.tools = [
            MCPTool(
                name="get_customer_profile",
                description="Retrieve detailed customer profile information",
                inputSchema={
                    "type": "object",
                    "properties": {
                        "customer_id": {
                            "type": "string",
                            "description": "Unique customer identifier"
                        }
                    },
                    "required": ["customer_id"]
                }
            ),
            MCPTool(
                name="search_customers",
                description="Search customers by various criteria",
                inputSchema={
                    "type": "object",
                    "properties": {
                        "query": {
                            "type": "string",
                            "description": "Search query"
                        },
                        "filters": {
                            "type": "object",
                            "description": "Additional search filters"
                        }
                    },
                    "required": ["query"]
                }
            ),
            MCPTool(
                name="create_support_ticket",
                description="Create a new customer support ticket",
                inputSchema={
                    "type": "object",
                    "properties": {
                        "customer_id": {"type": "string"},
                        "issue_type": {"type": "string"},
                        "description": {"type": "string"},
                        "priority": {
                            "type": "string",
                            "enum": ["low", "medium", "high", "urgent"]
                        }
                    },
                    "required": ["customer_id", "issue_type", "description"]
                }
            )
        ]
    
    def init_sample_data(self):
        """サンプルデータの初期化"""
        self.customer_database = {
            "12345": {
                "id": "12345",
                "name": "田中太郎",
                "email": "tanaka@example.com",
                "phone": "090-1234-5678",
                "membership_level": "Premium",
                "registration_date": "2021-03-15",
                "last_activity": "2024-01-10",
                "total_orders": 47,
                "total_spent": 284750,
                "preferred_contact": "email",
                "support_history": [
                    {
                        "ticket_id": "TKT-001",
                        "date": "2024-01-05",
                        "issue": "商品未着",
                        "status": "resolved",
                        "resolution_time": "2時間"
                    }
                ]
            },
            "67890": {
                "id": "67890",
                "name": "佐藤花子",
                "email": "sato@example.com",
                "phone": "080-9876-5432",
                "membership_level": "Standard",
                "registration_date": "2023-06-20",
                "last_activity": "2024-01-12",
                "total_orders": 12,
                "total_spent": 67890,
                "preferred_contact": "phone"
            }
        }
        
        self.ticket_database = {
            "TKT-001": {
                "id": "TKT-001",
                "customer_id": "12345",
                "issue_type": "delivery",
                "description": "注文した商品が予定日を過ぎても届きません",
                "priority": "high",
                "status": "resolved",
                "created_at": "2024-01-05T10:30:00Z",
                "resolved_at": "2024-01-05T12:30:00Z",
                "agent_id": "agent_001",
                "resolution": "配送業者確認の結果、台風による遅延。代替配送手配済み"
            }
        }
    
    async def handle_list_resources(self) -> List[MCPResource]:
        """利用可能なリソース一覧"""
        return self.resources
    
    async def handle_read_resource(self, uri: str) -> Dict[str, Any]:
        """リソース読み取り"""
        if uri == "customer://profiles":
            return {
                "contents": [
                    {
                        "uri": uri,
                        "mimeType": "application/json",
                        "text": json.dumps(list(self.customer_database.values()), 
                                         ensure_ascii=False, indent=2)
                    }
                ]
            }
        elif uri == "customer://tickets":
            return {
                "contents": [
                    {
                        "uri": uri,
                        "mimeType": "application/json",
                        "text": json.dumps(list(self.ticket_database.values()),
                                         ensure_ascii=False, indent=2)
                    }
                ]
            }
        elif uri == "customer://analytics":
            analytics = self._generate_customer_analytics()
            return {
                "contents": [
                    {
                        "uri": uri,
                        "mimeType": "application/json",
                        "text": json.dumps(analytics, ensure_ascii=False, indent=2)
                    }
                ]
            }
        else:
            raise ValueError(f"Unknown resource URI: {uri}")
    
    async def handle_call_tool(self, name: str, arguments: Dict[str, Any]) -> Dict[str, Any]:
        """ツール実行"""
        if name == "get_customer_profile":
            customer_id = arguments["customer_id"]
            customer = self.customer_database.get(customer_id)
            
            if customer:
                return {
                    "success": True,
                    "data": customer
                }
            else:
                return {
                    "success": False,
                    "error": f"Customer {customer_id} not found"
                }
        
        elif name == "search_customers":
            query = arguments["query"].lower()
            filters = arguments.get("filters", {})
            
            results = []
            for customer in self.customer_database.values():
                # 簡単な検索実装
                if (query in customer["name"].lower() or 
                    query in customer["email"].lower()):
                    
                    # フィルター適用
                    match = True
                    for key, value in filters.items():
                        if key in customer and customer[key] != value:
                            match = False
                            break
                    
                    if match:
                        results.append(customer)
            
            return {
                "success": True,
                "data": results,
                "total_count": len(results)
            }
        
        elif name == "create_support_ticket":
            ticket_id = f"TKT-{len(self.ticket_database) + 1:03d}"
            ticket = {
                "id": ticket_id,
                "customer_id": arguments["customer_id"],
                "issue_type": arguments["issue_type"],
                "description": arguments["description"],
                "priority": arguments.get("priority", "medium"),
                "status": "open",
                "created_at": datetime.now().isoformat(),
                "agent_id": None
            }
            
            self.ticket_database[ticket_id] = ticket
            
            return {
                "success": True,
                "data": ticket,
                "estimated_resolution_time": self._estimate_resolution_time(ticket)
            }
        
        else:
            raise ValueError(f"Unknown tool: {name}")
    
    def _generate_customer_analytics(self) -> Dict[str, Any]:
        """顧客アナリティクス生成"""
        total_customers = len(self.customer_database)
        total_orders = sum(c.get("total_orders", 0) for c in self.customer_database.values())
        total_revenue = sum(c.get("total_spent", 0) for c in self.customer_database.values())
        
        membership_breakdown = {}
        for customer in self.customer_database.values():
            level = customer.get("membership_level", "Standard")
            membership_breakdown[level] = membership_breakdown.get(level, 0) + 1
        
        return {
            "summary": {
                "total_customers": total_customers,
                "total_orders": total_orders,
                "total_revenue": total_revenue,
                "average_order_value": total_revenue / total_orders if total_orders > 0 else 0
            },
            "membership_breakdown": membership_breakdown,
            "top_customers": sorted(
                self.customer_database.values(),
                key=lambda x: x.get("total_spent", 0),
                reverse=True
            )[:5]
        }
    
    def _estimate_resolution_time(self, ticket: Dict[str, Any]) -> str:
        """解決時間の見積もり"""
        priority_times = {
            "urgent": "1時間以内",
            "high": "4時間以内", 
            "medium": "1営業日以内",
            "low": "3営業日以内"
        }
        return priority_times.get(ticket["priority"], "3営業日以内")

# サーバー起動
async def start_customer_server():
    server = CustomerDataServer()
    await server.start_server("localhost", 8767)

if __name__ == "__main__":
    asyncio.run(start_customer_server())

2. スマートシティデータMCPサーバー

# src/mcp/servers/city_server.py
class CityDataServer(MCPServer):
    """スマートシティデータ専用MCPサーバー"""
    
    def __init__(self):
        super().__init__("city-data-server", "1.0.0")
        self.traffic_data = {}
        self.energy_data = {}
        self.emergency_data = {}
        self.init_resources_and_tools()
        self.init_sample_data()
    
    def init_resources_and_tools(self):
        """リソースとツールの定義"""
        self.resources = [
            MCPResource(
                uri="city://traffic",
                name="Traffic Management",
                description="Real-time traffic data and control systems",
                mimeType="application/json"
            ),
            MCPResource(
                uri="city://energy",
                name="Energy Management", 
                description="City-wide energy consumption and distribution",
                mimeType="application/json"
            ),
            MCPResource(
                uri="city://emergency",
                name="Emergency Response",
                description="Emergency incidents and response coordination",
                mimeType="application/json"
            )
        ]
        
        self.tools = [
            MCPTool(
                name="get_traffic_status",
                description="Get current traffic status for specified locations",
                inputSchema={
                    "type": "object",
                    "properties": {
                        "location": {
                            "type": "string",
                            "description": "Location identifier or area name"
                        },
                        "radius": {
                            "type": "number",
                            "description": "Search radius in kilometers"
                        }
                    },
                    "required": ["location"]
                }
            ),
            MCPTool(
                name="control_traffic_lights",
                description="Control traffic light timing for optimization",
                inputSchema={
                    "type": "object",
                    "properties": {
                        "intersection_id": {"type": "string"},
                        "timing_plan": {"type": "object"}
                    },
                    "required": ["intersection_id", "timing_plan"]
                }
            ),
            MCPTool(
                name="get_energy_consumption",
                description="Get energy consumption data for city zones",
                inputSchema={
                    "type": "object",
                    "properties": {
                        "zone": {"type": "string"},
                        "timeframe": {"type": "string"}
                    },
                    "required": ["zone"]
                }
            ),
            MCPTool(
                name="report_emergency",
                description="Report and track emergency incidents",
                inputSchema={
                    "type": "object",
                    "properties": {
                        "incident_type": {"type": "string"},
                        "location": {"type": "string"},
                        "severity": {"type": "string"},
                        "description": {"type": "string"}
                    },
                    "required": ["incident_type", "location", "severity"]
                }
            )
        ]
    
    def init_sample_data(self):
        """サンプルデータの初期化"""
        self.traffic_data = {
            "新宿駅周辺": {
                "location": "新宿駅周辺",
                "congestion_level": "high",
                "average_speed": 15.2,
                "traffic_volume": 1250,
                "incidents": [
                    {
                        "type": "工事",
                        "location": "新宿通り",
                        "impact": "1車線規制",
                        "duration": "2024-01-10〜2024-01-20"
                    }
                ],
                "alternative_routes": [
                    "明治通り経由",
                    "青梅街道経由"
                ],
                "last_updated": "2024-01-12T14:30:00Z"
            },
            "渋谷駅周辺": {
                "location": "渋谷駅周辺", 
                "congestion_level": "medium",
                "average_speed": 22.8,
                "traffic_volume": 890,
                "incidents": [],
                "last_updated": "2024-01-12T14:30:00Z"
            }
        }
        
        self.energy_data = {
            "新宿区": {
                "zone": "新宿区",
                "current_consumption": 45.7,  # MW
                "peak_consumption": 52.3,
                "renewable_percentage": 23.5,
                "grid_status": "normal",
                "demand_forecast": {
                    "next_hour": 47.2,
                    "next_4_hours": 51.8,
                    "peak_time": "18:00-19:00"
                },
                "last_updated": "2024-01-12T14:30:00Z"
            }
        }
    
    async def handle_call_tool(self, name: str, arguments: Dict[str, Any]) -> Dict[str, Any]:
        """ツール実行"""
        if name == "get_traffic_status":
            location = arguments["location"]
            radius = arguments.get("radius", 1.0)
            
            # 指定された場所の交通情報を検索
            traffic_info = self.traffic_data.get(location)
            
            if traffic_info:
                return {
                    "success": True,
                    "data": traffic_info,
                    "recommendations": self._generate_traffic_recommendations(traffic_info)
                }
            else:
                # 近隣エリアの検索(簡略化)
                nearby_areas = [area for area in self.traffic_data.keys() 
                              if location in area or area in location]
                
                if nearby_areas:
                    return {
                        "success": True,
                        "data": [self.traffic_data[area] for area in nearby_areas],
                        "message": f"Nearby areas found for {location}"
                    }
                else:
                    return {
                        "success": False,
                        "error": f"No traffic data available for {location}"
                    }
        
        elif name == "control_traffic_lights":
            intersection_id = arguments["intersection_id"]
            timing_plan = arguments["timing_plan"]
            
            # 交通信号制御の実装(シミュレーション)
            control_result = {
                "intersection_id": intersection_id,
                "previous_timing": {"green": 45, "yellow": 3, "red": 42},
                "new_timing": timing_plan,
                "estimated_improvement": "15% reduction in wait time",
                "implementation_time": "2024-01-12T15:00:00Z"
            }
            
            return {
                "success": True,
                "data": control_result
            }
        
        elif name == "get_energy_consumption":
            zone = arguments["zone"]
            timeframe = arguments.get("timeframe", "current")
            
            energy_info = self.energy_data.get(zone)
            
            if energy_info:
                return {
                    "success": True,
                    "data": energy_info,
                    "optimization_suggestions": self._generate_energy_optimization(energy_info)
                }
            else:
                return {
                    "success": False,
                    "error": f"No energy data available for zone {zone}"
                }
        
        elif name == "report_emergency":
            emergency_id = f"EMG-{len(self.emergency_data) + 1:04d}"
            
            emergency = {
                "id": emergency_id,
                "incident_type": arguments["incident_type"],
                "location": arguments["location"],
                "severity": arguments["severity"],
                "description": arguments.get("description", ""),
                "status": "reported",
                "reported_at": datetime.now().isoformat(),
                "response_units": self._assign_response_units(arguments),
                "estimated_response_time": self._calculate_response_time(arguments)
            }
            
            self.emergency_data[emergency_id] = emergency
            
            return {
                "success": True,
                "data": emergency,
                "immediate_actions": self._generate_immediate_actions(emergency)
            }
        
        else:
            raise ValueError(f"Unknown tool: {name}")
    
    def _generate_traffic_recommendations(self, traffic_info: Dict[str, Any]) -> List[str]:
        """交通最適化の推奨事項"""
        recommendations = []
        
        if traffic_info["congestion_level"] == "high":
            recommendations.extend([
                "代替ルートの利用を推奨",
                "公共交通機関の利用を検討",
                "ピーク時間外の移動を推奨"
            ])
        
        if traffic_info["incidents"]:
            recommendations.append("工事・事故情報を確認してルート選択")
        
        return recommendations
    
    def _generate_energy_optimization(self, energy_info: Dict[str, Any]) -> List[str]:
        """エネルギー最適化の提案"""
        suggestions = []
        
        current = energy_info["current_consumption"]
        peak = energy_info["peak_consumption"]
        
        if current > peak * 0.9:
            suggestions.extend([
                "非重要施設の電力使用を削減",
                "蓄電池からの放電開始",
                "需要調整契約の発動検討"
            ])
        
        if energy_info["renewable_percentage"] < 30:
            suggestions.append("再生可能エネルギーの比率向上")
        
        return suggestions
    
    def _assign_response_units(self, emergency: Dict[str, Any]) -> List[str]:
        """緊急対応ユニットの割り当て"""
        incident_type = emergency["incident_type"]
        severity = emergency["severity"]
        
        units = []
        
        if incident_type in ["fire", "explosion"]:
            units.extend(["消防車2台", "救急車1台"])
            if severity == "high":
                units.extend(["特殊救助車", "指揮車"])
        
        elif incident_type in ["accident", "traffic"]:
            units.extend(["パトカー1台", "救急車1台"])
            if severity == "high":
                units.append("交通機動隊")
        
        elif incident_type == "medical":
            units.append("救急車1台")
            if severity == "high":
                units.extend(["ドクターカー", "消防車"])
        
        return units
    
    def _calculate_response_time(self, emergency: Dict[str, Any]) -> str:
        """対応時間の計算"""
        severity_times = {
            "low": "15-20分",
            "medium": "8-12分",
            "high": "3-5分",
            "critical": "1-3分"
        }
        
        return severity_times.get(emergency["severity"], "10-15分")
    
    def _generate_immediate_actions(self, emergency: Dict[str, Any]) -> List[str]:
        """即座に実行すべきアクション"""
        actions = [
            f"対応ユニット {', '.join(emergency['response_units'])} に出動指示",
            f"現場 {emergency['location']} の交通規制検討"
        ]
        
        if emergency["severity"] in ["high", "critical"]:
            actions.extend([
                "近隣住民への避難指示準備",
                "医療機関へのアラート送信",
                "メディア対応準備"
            ])
        
        return actions

# サーバー起動
async def start_city_server():
    server = CityDataServer()
    await server.start_server("localhost", 8768)

if __name__ == "__main__":
    asyncio.run(start_city_server())

🔌 MCPクライアント実装

統一されたMCPクライアント

# src/mcp/client.py
class MCPClient:
    """MCP統一クライアント"""
    
    def __init__(self):
        self.connections = {}
        self.server_configs = {
            "customer_database": {
                "url": "ws://localhost:8767",
                "capabilities": ["resources", "tools"]
            },
            "city_management": {
                "url": "ws://localhost:8768", 
                "capabilities": ["resources", "tools"]
            },
            "enterprise_systems": {
                "url": "ws://localhost:8769",
                "capabilities": ["resources", "tools"]
            }
        }
        self.message_id_counter = 0
    
    async def connect_to_server(self, server_name: str):
        """MCPサーバーへの接続"""
        if server_name not in self.server_configs:
            raise ValueError(f"Unknown server: {server_name}")
        
        config = self.server_configs[server_name]
        
        try:
            websocket = await websockets.connect(config["url"])
            
            # 初期化メッセージ送信
            init_message = MCPMessage(
                id=self._get_next_id(),
                method="initialize",
                params={
                    "protocolVersion": "2024-11-05",
                    "clientInfo": {
                        "name": "MultiAgent MCP Client",
                        "version": "1.0.0"
                    },
                    "capabilities": {
                        "resources": {"subscribe": True},
                        "tools": {}
                    }
                }
            )
            
            await websocket.send(json.dumps(init_message.dict(exclude_none=True)))
            
            # 応答を待機
            response = await websocket.recv()
            response_data = json.loads(response)
            
            if "error" in response_data:
                raise Exception(f"Initialization failed: {response_data['error']}")
            
            self.connections[server_name] = websocket
            print(f"Connected to MCP server: {server_name}")
            
            return True
            
        except Exception as e:
            print(f"Failed to connect to {server_name}: {e}")
            return False
    
    async def query_data_source(self, source: str, action: str, 
                              parameters: Dict[str, Any] = None) -> Dict[str, Any]:
        """データソースへのクエリ"""
        server_name = self._get_server_for_source(source)
        
        if server_name not in self.connections:
            await self.connect_to_server(server_name)
        
        websocket = self.connections[server_name]
        
        # アクションに応じてMCPメソッドを決定
        if action.startswith("get_") or action.startswith("search_"):
            method = "tools/call"
            params = {
                "name": action,
                "arguments": parameters or {}
            }
        else:
            method = "tools/call"
            params = {
                "name": action,
                "arguments": parameters or {}
            }
        
        # メッセージ送信
        message = MCPMessage(
            id=self._get_next_id(),
            method=method,
            params=params
        )
        
        await websocket.send(json.dumps(message.dict(exclude_none=True)))
        
        # 応答受信
        response = await websocket.recv()
        response_data = json.loads(response)
        
        if "error" in response_data:
            raise Exception(f"MCP query failed: {response_data['error']}")
        
        return response_data.get("result", {})
    
    async def list_available_resources(self, server_name: str) -> List[Dict[str, Any]]:
        """利用可能なリソース一覧取得"""
        if server_name not in self.connections:
            await self.connect_to_server(server_name)
        
        websocket = self.connections[server_name]
        
        message = MCPMessage(
            id=self._get_next_id(),
            method="resources/list"
        )
        
        await websocket.send(json.dumps(message.dict(exclude_none=True)))
        response = await websocket.recv()
        response_data = json.loads(response)
        
        return response_data.get("result", {}).get("resources", [])
    
    async def list_available_tools(self, server_name: str) -> List[Dict[str, Any]]:
        """利用可能なツール一覧取得"""
        if server_name not in self.connections:
            await self.connect_to_server(server_name)
        
        websocket = self.connections[server_name]
        
        message = MCPMessage(
            id=self._get_next_id(),
            method="tools/list"
        )
        
        await websocket.send(json.dumps(message.dict(exclude_none=True)))
        response = await websocket.recv()
        response_data = json.loads(response)
        
        return response_data.get("result", {}).get("tools", [])
    
    def _get_server_for_source(self, source: str) -> str:
        """データソースに対応するサーバーを特定"""
        source_mapping = {
            "customer_database": "customer_database",
            "customer_profiles": "customer_database", 
            "support_tickets": "customer_database",
            "traffic_data": "city_management",
            "energy_data": "city_management",
            "emergency_data": "city_management",
            "hr_database": "enterprise_systems",
            "finance_database": "enterprise_systems",
            "project_database": "enterprise_systems"
        }
        
        return source_mapping.get(source, "customer_database")
    
    def _get_next_id(self) -> int:
        """次のメッセージIDを取得"""
        self.message_id_counter += 1
        return self.message_id_counter
    
    async def close_all_connections(self):
        """全接続を閉じる"""
        for server_name, websocket in self.connections.items():
            try:
                await websocket.close()
                print(f"Closed connection to {server_name}")
            except:
                pass
        self.connections.clear()

# エージェントでの使用例
class EnhancedCustomerAgent(BaseAgent):
    """MCP対応カスタマーエージェント"""
    
    def __init__(self):
        super().__init__("customer_agent", "Enhanced Customer Agent")
        self.mcp_client = MCPClient()
    
    async def get_customer_info(self, customer_id: str) -> Dict[str, Any]:
        """MCP経由での顧客情報取得"""
        try:
            result = await self.mcp_client.query_data_source(
                source="customer_database",
                action="get_customer_profile",
                parameters={"customer_id": customer_id}
            )
            
            return result
            
        except Exception as e:
            return {
                "success": False,
                "error": f"Failed to retrieve customer info: {e}"
            }
    
    async def search_customers(self, query: str, filters: Dict[str, Any] = None):
        """MCP経由での顧客検索"""
        try:
            result = await self.mcp_client.query_data_source(
                source="customer_database",
                action="search_customers",
                parameters={"query": query, "filters": filters or {}}
            )
            
            return result
            
        except Exception as e:
            return {
                "success": False,
                "error": f"Customer search failed: {e}"
            }

🔧 MCPの利点と実用性

1. 統一されたデータアクセス

従来:

# 各エージェントが個別実装
customer_db = CustomerDatabase(host, user, password)
traffic_api = TrafficAPI(api_key, endpoint)
hr_system = HRSystem(oauth_token)

MCP使用:

# すべて統一インターフェース
mcp_client = MCPClient()
customer_data = await mcp_client.query_data_source("customer_database", "get_customer", params)
traffic_data = await mcp_client.query_data_source("traffic_data", "get_status", params)
hr_data = await mcp_client.query_data_source("hr_database", "get_employee", params)

2. エラーハンドリングの統一

class MCPErrorHandler:
    """MCP統一エラーハンドリング"""
    
    @staticmethod
    async def safe_query(mcp_client: MCPClient, source: str, 
                        action: str, parameters: Dict[str, Any],
                        fallback_action: Callable = None) -> Dict[str, Any]:
        """安全なMCPクエリ実行"""
        try:
            result = await mcp_client.query_data_source(source, action, parameters)
            return result
            
        except ConnectionError:
            logger.warning(f"Connection failed to {source}, retrying...")
            await asyncio.sleep(1)
            try:
                return await mcp_client.query_data_source(source, action, parameters)
            except:
                if fallback_action:
                    return await fallback_action(parameters)
                return {"success": False, "error": "Connection failed"}
        
        except Exception as e:
            logger.error(f"MCP query failed: {e}")
            return {"success": False, "error": str(e)}

3. 設定の一元管理

# config/mcp_config.yaml
mcp_servers:
  customer_database:
    url: "ws://localhost:8767"
    retry_attempts: 3
    timeout: 30
    capabilities: ["resources", "tools"]
    
  city_management:
    url: "ws://localhost:8768"
    retry_attempts: 3
    timeout: 30
    capabilities: ["resources", "tools"]
    
  enterprise_systems:
    url: "ws://localhost:8769"
    retry_attempts: 3
    timeout: 30
    capabilities: ["resources", "tools"]

# 設定読み込み
class MCPConfigManager:
    @staticmethod
    def load_config(config_path: str) -> Dict[str, Any]:
        with open(config_path, 'r') as file:
            return yaml.safe_load(file)

🎯 次回予告

次回の実践編 では、実際にシステムを動かしながら:

  • 開発環境のセットアップ方法
  • 実際の動作シーンの詳細解説
  • パフォーマンス測定と最適化ポイント
  • トラブルシューティングの実践方法

Model Context Protocolによって、マルチエージェントシステムのデータアクセス層が大幅に統一化・効率化されることを学びました。次回は、実際にシステムを動かして体験していきましょう。

📚 参考リンク


#MCP #ModelContextProtocol #WebSocket #DataIntegration #API #Python #MultiAgent

2
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?