Model Context Protocol(MCP)でデータソース統合を実現
はじめに
前回のアーキテクチャ編では、Agent-to-Agent(A2A)通信の実装方法を詳しく解説しました。今回は、Model Context Protocol(MCP) の具体的な実装について深掘りします。MCPは、AIエージェントと外部データソースを効率的に接続するための標準プロトコルで、マルチエージェントシステムにおけるデータアクセス層の統一化を実現します。
🔄 マルチエージェント協調におけるMCPの位置づけ
MCPは単なるデータアクセスプロトコルではなく、マルチエージェントシステムの情報基盤として重要な役割を果たします:
📊 マルチエージェントシステムにおけるMCPの役割
User Query
↓
Coordinator Agent(A2A通信による調整)
↓ ↙ ↘
Agent A Agent B Agent C(各専門エージェント)
↓ ↓ ↓
MCP MCP MCP(統一データアクセス)
↓ ↓ ↓
Database A API B Service C(外部データソース)
MCPによって実現される価値:
- データアクセスの統一化: 全エージェントが同じ方法でデータにアクセス
- A2A通信での情報整合性: エージェント間でのデータ共有時の一貫性保証
- リアルタイム協調の効率化: データ取得の高速化による協調処理の最適化
🎯 MCPが解決する課題
従来のデータアクセスの問題
# ❌ 各エージェントが個別にデータアクセス実装
class CustomerAgent:
def __init__(self):
# データベース接続設定
self.db_config = {
"host": "localhost",
"user": "app_user",
"password": "secret",
"database": "customer_db"
}
async def get_customer_info(self, customer_id: str):
# 個別のデータベース接続
conn = await asyncpg.connect(**self.db_config)
try:
result = await conn.fetchrow(
"SELECT * FROM customers WHERE id = $1", customer_id
)
return dict(result) if result else None
finally:
await conn.close()
class CityAgent:
def __init__(self):
# 別のAPI接続設定
self.api_config = {
"base_url": "https://city-api.example.com",
"api_key": "city_api_key"
}
async def get_traffic_data(self, location: str):
# 個別のAPI接続
async with aiohttp.ClientSession() as session:
headers = {"Authorization": f"Bearer {self.api_config['api_key']}"}
async with session.get(
f"{self.api_config['base_url']}/traffic/{location}",
headers=headers
) as response:
return await response.json()
問題点:
- 🔄 コードの重複: 各エージェントで類似のデータアクセスコード
- 🔧 保守性の低下: データソース変更時の影響範囲が大
- 🔐 セキュリティリスク: 認証情報の分散管理
- 📊 非統一: データ形式やエラーハンドリングの不統一
MCPによる解決
# ✅ MCP経由でのデータアクセス
class CustomerAgent(BaseAgent):
def __init__(self):
super().__init__("customer_agent", "Customer Support Agent")
self.mcp_client = MCPClient()
async def get_customer_info(self, customer_id: str):
# 統一されたインターフェース
response = await self.mcp_client.query_data_source(
source="customer_database",
action="get_customer",
parameters={"customer_id": customer_id}
)
return response.data
class CityAgent(BaseAgent):
def __init__(self):
super().__init__("city_agent", "Smart City Management Agent")
self.mcp_client = MCPClient()
async def get_traffic_data(self, location: str):
# 同じインターフェース
response = await self.mcp_client.query_data_source(
source="traffic_api",
action="get_traffic_status",
parameters={"location": location}
)
return response.data
🧠 MCP + GPT-4o基盤推論システム統合 (NEW)
従来のMCPによるデータアクセス統一に加えて、GPT-4o基盤推論機能 による真のAI判断 を統合しました。
🔮 将来性: 現在はGPT-4oを使用していますが、GPT-5.1やOpenAI o1シリーズの専用推論モデルが利用可能になった際には、さらに高度な推論ベースのデータ選択が実現されます。
AI判断による動的データソース選択
# ✅ AI判断による最適データソース選択
class IntelligentMCPClient:
"""推論機能統合MCPクライアント"""
def __init__(self):
self.mcp_client = MCPClient()
self.reasoning_engine = AdvancedReasoningAgent(
"mcp_reasoning_agent",
"MCP Reasoning Agent",
["data_analysis", "source_optimization"]
)
async def intelligent_data_query(self, user_query: str) -> Dict[str, Any]:
"""AI判断による最適データクエリ実行"""
# GPT-4oによる推論分析(将来的にGPT-5.1/o1対応予定)
reasoning_result = await self.reasoning_engine.execute_deep_reasoning(f"""
ユーザークエリ: "{user_query}"
以下の観点から最適なデータソースとクエリ戦略を分析してください:
1. 必要な情報の種類と優先度
2. 利用可能なデータソース: {list(self.available_sources.keys())}
3. レスポンス時間とデータ品質のトレードオフ
4. 複数ソース統合の必要性
""")
# 推論結果に基づく実行計画
execution_plan = reasoning_result["recommended_solution"]["implementation_plan"]
results = {}
for phase in execution_plan:
for action in phase["actions"]:
if "query_source" in action:
source_result = await self._execute_data_query(
action["source"],
action["query_parameters"]
)
results[action["source"]] = source_result
# AI統合による結果最適化
integrated_result = await self._intelligent_result_integration(
results, user_query, reasoning_result
)
return integrated_result
async def _intelligent_result_integration(self,
source_results: Dict[str, Any],
original_query: str,
reasoning_context: Dict[str, Any]) -> Dict[str, Any]:
"""AI判断による結果統合"""
integration_prompt = f"""
元のクエリ: "{original_query}"
収集データ: {json.dumps(source_results, ensure_ascii=False, indent=2)}
推論コンテキスト: {reasoning_context}
以下を実行してください:
1. データ間の関連性分析
2. 矛盾点の特定と調整
3. ユーザーニーズに最適化した統合
4. 信頼性評価と根拠提示
"""
integration_result = await self.reasoning_engine.execute_deep_reasoning(
integration_prompt, ReasoningMode.STRATEGIC
)
return {
"integrated_data": integration_result["recommended_solution"],
"data_sources": list(source_results.keys()),
"confidence_score": integration_result["quality_assessment"]["confidence_score"],
"reasoning_trace": integration_result["reasoning_steps"]
}
推論駆動MCPサーバー
# src/mcp/servers/intelligent_knowledge_server.py
class IntelligentKnowledgeServer(MCPServer):
"""推論機能統合ナレッジサーバー"""
async def handle_intelligent_query(self, query_data: Dict[str, Any]) -> Dict[str, Any]:
"""AI推論による知識統合クエリ処理"""
query = query_data["query"]
context = query_data.get("context", {})
# マルチエージェント協調推論実行
reasoning_coordinator = MultiAgentReasoningCoordinator()
reasoning_task = ReasoningTask(
task_id=f"knowledge_query_{uuid.uuid4().hex[:8]}",
description=f"ナレッジベース統合クエリ: {query}",
complexity=self._assess_query_complexity(query),
required_expertise=["knowledge_management", "information_retrieval"],
constraints={"response_time": "fast", "accuracy": "high"},
success_criteria={"relevance": 0.9, "completeness": 0.85},
priority=3
)
# 協調推論による最適回答生成
reasoning_result = await reasoning_coordinator.execute_multi_agent_reasoning(
reasoning_task=reasoning_task,
orchestration_strategy=OrchestrationStrategy.ADAPTIVE
)
return {
"answer": reasoning_result["final_synthesis"]["integrated_solution"]["final_recommendation"],
"reasoning_process": reasoning_result["reasoning_results"],
"confidence": reasoning_result["final_synthesis"]["collaboration_quality"]["synthesis_confidence"],
"sources": self._extract_knowledge_sources(reasoning_result),
"session_id": reasoning_result["session_id"]
}
推論統合MCPの利点:
- 🎯 動的最適化: AIがクエリに応じて最適なデータソースを選択
- 🧠 知識統合: 複数ソースからの情報を推論により統合
- 📊 品質保証: AI評価による結果の信頼性判定
- 🔄 学習改善: 推論履歴からの継続的システム改善
🏗️ MCP アーキテクチャ
全体構成
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Customer │ │ Smart City │ │ Enterprise │
│ Agent │ │ Agent │ │ Agent │
└─────────┬───────┘ └─────────┬───────┘ └─────────┬───────┘
│ │ │
└──────────┬───────────┬──────────────────────┘
│ │
┌──────▼───────────▼──────┐
│ MCP Client Layer │
└──────┬───────────┬──────┘
│ │
┌────────────▼──┐ ┌────▼────────────┐
│ WebSocket │ │ WebSocket │
│ Connection │ │ Connection │
└────────────┬──┘ └────┬────────────┘
│ │
┌───────────────▼──┐ ┌────▼──────────────┐ ┌─────────────────┐
│ Customer Data │ │ City Data │ │ Enterprise Data │
│ MCP Server │ │ MCP Server │ │ MCP Server │
└───────┬──────────┘ └────┬──────────────┘ └─────────┬───────┘
│ │ │
┌───────▼──────────┐ ┌────▼──────────────┐ ┌─────────▼───────┐
│ Customer │ │ Traffic API │ │ HR Database │
│ Database │ │ Energy API │ │ Finance Database│
└──────────────────┘ └───────────────────┘ └─────────────────┘
MCPプロトコルの基本構造
# src/mcp/protocol.py
from abc import ABC, abstractmethod
from typing import Dict, Any, List, Optional, Union
from pydantic import BaseModel
import websockets
import json
import asyncio
class MCPMessage(BaseModel):
"""MCP基本メッセージ形式"""
jsonrpc: str = "2.0"
id: Optional[Union[str, int]] = None
method: Optional[str] = None
params: Optional[Dict[str, Any]] = None
result: Optional[Dict[str, Any]] = None
error: Optional[Dict[str, Any]] = None
class MCPResource(BaseModel):
"""MCPリソース定義"""
uri: str
name: str
description: Optional[str] = None
mimeType: Optional[str] = None
class MCPTool(BaseModel):
"""MCPツール定義"""
name: str
description: str
inputSchema: Dict[str, Any]
class MCPServer(ABC):
"""MCP Server 基底クラス"""
def __init__(self, name: str, version: str = "1.0.0"):
self.name = name
self.version = version
self.resources: List[MCPResource] = []
self.tools: List[MCPTool] = []
self.clients = set()
@abstractmethod
async def handle_list_resources(self) -> List[MCPResource]:
"""利用可能なリソース一覧を返す"""
# 各サーバーで具体的なリソースを定義
# 例: データベーステーブル、APIエンドポイント等
return [
MCPResource(
uri="mcp://example/resource",
name="サンプルリソース",
description="実装例用リソース",
mimeType="application/json"
)
]
@abstractmethod
async def handle_read_resource(self, uri: str) -> Dict[str, Any]:
"""指定されたリソースを読み取る"""
# URIに基づいてリソースを取得
if uri == "mcp://example/resource":
return {
"content": {
"data": "サンプルデータ",
"timestamp": datetime.now().isoformat()
},
"mimeType": "application/json"
}
else:
raise ValueError(f"リソースが見つかりません: {uri}")
@abstractmethod
async def handle_call_tool(self, name: str, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""指定されたツールを実行する"""
# ツール名に基づいた処理を実行
if name == "example_tool":
return {
"result": f"ツール実行結果: {arguments}",
"status": "success",
"timestamp": datetime.now().isoformat()
}
else:
raise ValueError(f"未対応のツール: {name}")
async def start_server(self, host: str = "localhost", port: int = 8765):
"""WebSocketサーバー開始"""
print(f"Starting MCP Server '{self.name}' on {host}:{port}")
await websockets.serve(self.handle_client, host, port)
async def handle_client(self, websocket, path):
"""クライアント接続処理"""
self.clients.add(websocket)
try:
await self.client_session(websocket)
finally:
self.clients.remove(websocket)
async def client_session(self, websocket):
"""クライアントセッション管理"""
async for message in websocket:
try:
data = json.loads(message)
mcp_message = MCPMessage(**data)
response = await self.process_message(mcp_message)
if response:
await websocket.send(json.dumps(response.dict(exclude_none=True)))
except Exception as e:
error_response = MCPMessage(
id=data.get("id") if "data" in locals() else None,
error={
"code": -32603,
"message": "Internal error",
"data": str(e)
}
)
await websocket.send(json.dumps(error_response.dict(exclude_none=True)))
async def process_message(self, message: MCPMessage) -> Optional[MCPMessage]:
"""メッセージ処理のルーティング"""
if message.method == "initialize":
return await self.handle_initialize(message)
elif message.method == "resources/list":
resources = await self.handle_list_resources()
return MCPMessage(
id=message.id,
result={"resources": [r.dict() for r in resources]}
)
elif message.method == "resources/read":
uri = message.params.get("uri")
result = await self.handle_read_resource(uri)
return MCPMessage(
id=message.id,
result=result
)
elif message.method == "tools/list":
return MCPMessage(
id=message.id,
result={"tools": [t.dict() for t in self.tools]}
)
elif message.method == "tools/call":
name = message.params.get("name")
arguments = message.params.get("arguments", {})
result = await self.handle_call_tool(name, arguments)
return MCPMessage(
id=message.id,
result=result
)
else:
return MCPMessage(
id=message.id,
error={
"code": -32601,
"message": f"Method not found: {message.method}"
}
)
async def handle_initialize(self, message: MCPMessage) -> MCPMessage:
"""初期化処理"""
return MCPMessage(
id=message.id,
result={
"serverInfo": {
"name": self.name,
"version": self.version
},
"capabilities": {
"resources": {"subscribe": True, "listChanged": True},
"tools": {"listChanged": True}
}
}
)
🗃️ 具体的なMCPサーバー実装
1. カスタマーデータMCPサーバー
# src/mcp/servers/customer_server.py
class CustomerDataServer(MCPServer):
"""顧客データ専用MCPサーバー"""
def __init__(self):
super().__init__("customer-data-server", "1.0.0")
self.customer_database = {} # 実際にはデータベース接続
self.init_resources_and_tools()
self.init_sample_data()
def init_resources_and_tools(self):
"""リソースとツールの定義"""
# 利用可能なリソース
self.resources = [
MCPResource(
uri="customer://profiles",
name="Customer Profiles",
description="Customer profile information and history",
mimeType="application/json"
),
MCPResource(
uri="customer://tickets",
name="Support Tickets",
description="Customer support ticket database",
mimeType="application/json"
),
MCPResource(
uri="customer://analytics",
name="Customer Analytics",
description="Customer behavior analytics and insights",
mimeType="application/json"
)
]
# 利用可能なツール
self.tools = [
MCPTool(
name="get_customer_profile",
description="Retrieve detailed customer profile information",
inputSchema={
"type": "object",
"properties": {
"customer_id": {
"type": "string",
"description": "Unique customer identifier"
}
},
"required": ["customer_id"]
}
),
MCPTool(
name="search_customers",
description="Search customers by various criteria",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query"
},
"filters": {
"type": "object",
"description": "Additional search filters"
}
},
"required": ["query"]
}
),
MCPTool(
name="create_support_ticket",
description="Create a new customer support ticket",
inputSchema={
"type": "object",
"properties": {
"customer_id": {"type": "string"},
"issue_type": {"type": "string"},
"description": {"type": "string"},
"priority": {
"type": "string",
"enum": ["low", "medium", "high", "urgent"]
}
},
"required": ["customer_id", "issue_type", "description"]
}
)
]
def init_sample_data(self):
"""サンプルデータの初期化"""
self.customer_database = {
"12345": {
"id": "12345",
"name": "田中太郎",
"email": "tanaka@example.com",
"phone": "090-1234-5678",
"membership_level": "Premium",
"registration_date": "2021-03-15",
"last_activity": "2024-01-10",
"total_orders": 47,
"total_spent": 284750,
"preferred_contact": "email",
"support_history": [
{
"ticket_id": "TKT-001",
"date": "2024-01-05",
"issue": "商品未着",
"status": "resolved",
"resolution_time": "2時間"
}
]
},
"67890": {
"id": "67890",
"name": "佐藤花子",
"email": "sato@example.com",
"phone": "080-9876-5432",
"membership_level": "Standard",
"registration_date": "2023-06-20",
"last_activity": "2024-01-12",
"total_orders": 12,
"total_spent": 67890,
"preferred_contact": "phone"
}
}
self.ticket_database = {
"TKT-001": {
"id": "TKT-001",
"customer_id": "12345",
"issue_type": "delivery",
"description": "注文した商品が予定日を過ぎても届きません",
"priority": "high",
"status": "resolved",
"created_at": "2024-01-05T10:30:00Z",
"resolved_at": "2024-01-05T12:30:00Z",
"agent_id": "agent_001",
"resolution": "配送業者確認の結果、台風による遅延。代替配送手配済み"
}
}
async def handle_list_resources(self) -> List[MCPResource]:
"""利用可能なリソース一覧"""
return self.resources
async def handle_read_resource(self, uri: str) -> Dict[str, Any]:
"""リソース読み取り"""
if uri == "customer://profiles":
return {
"contents": [
{
"uri": uri,
"mimeType": "application/json",
"text": json.dumps(list(self.customer_database.values()),
ensure_ascii=False, indent=2)
}
]
}
elif uri == "customer://tickets":
return {
"contents": [
{
"uri": uri,
"mimeType": "application/json",
"text": json.dumps(list(self.ticket_database.values()),
ensure_ascii=False, indent=2)
}
]
}
elif uri == "customer://analytics":
analytics = self._generate_customer_analytics()
return {
"contents": [
{
"uri": uri,
"mimeType": "application/json",
"text": json.dumps(analytics, ensure_ascii=False, indent=2)
}
]
}
else:
raise ValueError(f"Unknown resource URI: {uri}")
async def handle_call_tool(self, name: str, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""ツール実行"""
if name == "get_customer_profile":
customer_id = arguments["customer_id"]
customer = self.customer_database.get(customer_id)
if customer:
return {
"success": True,
"data": customer
}
else:
return {
"success": False,
"error": f"Customer {customer_id} not found"
}
elif name == "search_customers":
query = arguments["query"].lower()
filters = arguments.get("filters", {})
results = []
for customer in self.customer_database.values():
# 簡単な検索実装
if (query in customer["name"].lower() or
query in customer["email"].lower()):
# フィルター適用
match = True
for key, value in filters.items():
if key in customer and customer[key] != value:
match = False
break
if match:
results.append(customer)
return {
"success": True,
"data": results,
"total_count": len(results)
}
elif name == "create_support_ticket":
ticket_id = f"TKT-{len(self.ticket_database) + 1:03d}"
ticket = {
"id": ticket_id,
"customer_id": arguments["customer_id"],
"issue_type": arguments["issue_type"],
"description": arguments["description"],
"priority": arguments.get("priority", "medium"),
"status": "open",
"created_at": datetime.now().isoformat(),
"agent_id": None
}
self.ticket_database[ticket_id] = ticket
return {
"success": True,
"data": ticket,
"estimated_resolution_time": self._estimate_resolution_time(ticket)
}
else:
raise ValueError(f"Unknown tool: {name}")
def _generate_customer_analytics(self) -> Dict[str, Any]:
"""顧客アナリティクス生成"""
total_customers = len(self.customer_database)
total_orders = sum(c.get("total_orders", 0) for c in self.customer_database.values())
total_revenue = sum(c.get("total_spent", 0) for c in self.customer_database.values())
membership_breakdown = {}
for customer in self.customer_database.values():
level = customer.get("membership_level", "Standard")
membership_breakdown[level] = membership_breakdown.get(level, 0) + 1
return {
"summary": {
"total_customers": total_customers,
"total_orders": total_orders,
"total_revenue": total_revenue,
"average_order_value": total_revenue / total_orders if total_orders > 0 else 0
},
"membership_breakdown": membership_breakdown,
"top_customers": sorted(
self.customer_database.values(),
key=lambda x: x.get("total_spent", 0),
reverse=True
)[:5]
}
def _estimate_resolution_time(self, ticket: Dict[str, Any]) -> str:
"""解決時間の見積もり"""
priority_times = {
"urgent": "1時間以内",
"high": "4時間以内",
"medium": "1営業日以内",
"low": "3営業日以内"
}
return priority_times.get(ticket["priority"], "3営業日以内")
# サーバー起動
async def start_customer_server():
server = CustomerDataServer()
await server.start_server("localhost", 8767)
if __name__ == "__main__":
asyncio.run(start_customer_server())
2. スマートシティデータMCPサーバー
# src/mcp/servers/city_server.py
class CityDataServer(MCPServer):
"""スマートシティデータ専用MCPサーバー"""
def __init__(self):
super().__init__("city-data-server", "1.0.0")
self.traffic_data = {}
self.energy_data = {}
self.emergency_data = {}
self.init_resources_and_tools()
self.init_sample_data()
def init_resources_and_tools(self):
"""リソースとツールの定義"""
self.resources = [
MCPResource(
uri="city://traffic",
name="Traffic Management",
description="Real-time traffic data and control systems",
mimeType="application/json"
),
MCPResource(
uri="city://energy",
name="Energy Management",
description="City-wide energy consumption and distribution",
mimeType="application/json"
),
MCPResource(
uri="city://emergency",
name="Emergency Response",
description="Emergency incidents and response coordination",
mimeType="application/json"
)
]
self.tools = [
MCPTool(
name="get_traffic_status",
description="Get current traffic status for specified locations",
inputSchema={
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "Location identifier or area name"
},
"radius": {
"type": "number",
"description": "Search radius in kilometers"
}
},
"required": ["location"]
}
),
MCPTool(
name="control_traffic_lights",
description="Control traffic light timing for optimization",
inputSchema={
"type": "object",
"properties": {
"intersection_id": {"type": "string"},
"timing_plan": {"type": "object"}
},
"required": ["intersection_id", "timing_plan"]
}
),
MCPTool(
name="get_energy_consumption",
description="Get energy consumption data for city zones",
inputSchema={
"type": "object",
"properties": {
"zone": {"type": "string"},
"timeframe": {"type": "string"}
},
"required": ["zone"]
}
),
MCPTool(
name="report_emergency",
description="Report and track emergency incidents",
inputSchema={
"type": "object",
"properties": {
"incident_type": {"type": "string"},
"location": {"type": "string"},
"severity": {"type": "string"},
"description": {"type": "string"}
},
"required": ["incident_type", "location", "severity"]
}
)
]
def init_sample_data(self):
"""サンプルデータの初期化"""
self.traffic_data = {
"新宿駅周辺": {
"location": "新宿駅周辺",
"congestion_level": "high",
"average_speed": 15.2,
"traffic_volume": 1250,
"incidents": [
{
"type": "工事",
"location": "新宿通り",
"impact": "1車線規制",
"duration": "2024-01-10〜2024-01-20"
}
],
"alternative_routes": [
"明治通り経由",
"青梅街道経由"
],
"last_updated": "2024-01-12T14:30:00Z"
},
"渋谷駅周辺": {
"location": "渋谷駅周辺",
"congestion_level": "medium",
"average_speed": 22.8,
"traffic_volume": 890,
"incidents": [],
"last_updated": "2024-01-12T14:30:00Z"
}
}
self.energy_data = {
"新宿区": {
"zone": "新宿区",
"current_consumption": 45.7, # MW
"peak_consumption": 52.3,
"renewable_percentage": 23.5,
"grid_status": "normal",
"demand_forecast": {
"next_hour": 47.2,
"next_4_hours": 51.8,
"peak_time": "18:00-19:00"
},
"last_updated": "2024-01-12T14:30:00Z"
}
}
async def handle_call_tool(self, name: str, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""ツール実行"""
if name == "get_traffic_status":
location = arguments["location"]
radius = arguments.get("radius", 1.0)
# 指定された場所の交通情報を検索
traffic_info = self.traffic_data.get(location)
if traffic_info:
return {
"success": True,
"data": traffic_info,
"recommendations": self._generate_traffic_recommendations(traffic_info)
}
else:
# 近隣エリアの検索(簡略化)
nearby_areas = [area for area in self.traffic_data.keys()
if location in area or area in location]
if nearby_areas:
return {
"success": True,
"data": [self.traffic_data[area] for area in nearby_areas],
"message": f"Nearby areas found for {location}"
}
else:
return {
"success": False,
"error": f"No traffic data available for {location}"
}
elif name == "control_traffic_lights":
intersection_id = arguments["intersection_id"]
timing_plan = arguments["timing_plan"]
# 交通信号制御の実装(シミュレーション)
control_result = {
"intersection_id": intersection_id,
"previous_timing": {"green": 45, "yellow": 3, "red": 42},
"new_timing": timing_plan,
"estimated_improvement": "15% reduction in wait time",
"implementation_time": "2024-01-12T15:00:00Z"
}
return {
"success": True,
"data": control_result
}
elif name == "get_energy_consumption":
zone = arguments["zone"]
timeframe = arguments.get("timeframe", "current")
energy_info = self.energy_data.get(zone)
if energy_info:
return {
"success": True,
"data": energy_info,
"optimization_suggestions": self._generate_energy_optimization(energy_info)
}
else:
return {
"success": False,
"error": f"No energy data available for zone {zone}"
}
elif name == "report_emergency":
emergency_id = f"EMG-{len(self.emergency_data) + 1:04d}"
emergency = {
"id": emergency_id,
"incident_type": arguments["incident_type"],
"location": arguments["location"],
"severity": arguments["severity"],
"description": arguments.get("description", ""),
"status": "reported",
"reported_at": datetime.now().isoformat(),
"response_units": self._assign_response_units(arguments),
"estimated_response_time": self._calculate_response_time(arguments)
}
self.emergency_data[emergency_id] = emergency
return {
"success": True,
"data": emergency,
"immediate_actions": self._generate_immediate_actions(emergency)
}
else:
raise ValueError(f"Unknown tool: {name}")
def _generate_traffic_recommendations(self, traffic_info: Dict[str, Any]) -> List[str]:
"""交通最適化の推奨事項"""
recommendations = []
if traffic_info["congestion_level"] == "high":
recommendations.extend([
"代替ルートの利用を推奨",
"公共交通機関の利用を検討",
"ピーク時間外の移動を推奨"
])
if traffic_info["incidents"]:
recommendations.append("工事・事故情報を確認してルート選択")
return recommendations
def _generate_energy_optimization(self, energy_info: Dict[str, Any]) -> List[str]:
"""エネルギー最適化の提案"""
suggestions = []
current = energy_info["current_consumption"]
peak = energy_info["peak_consumption"]
if current > peak * 0.9:
suggestions.extend([
"非重要施設の電力使用を削減",
"蓄電池からの放電開始",
"需要調整契約の発動検討"
])
if energy_info["renewable_percentage"] < 30:
suggestions.append("再生可能エネルギーの比率向上")
return suggestions
def _assign_response_units(self, emergency: Dict[str, Any]) -> List[str]:
"""緊急対応ユニットの割り当て"""
incident_type = emergency["incident_type"]
severity = emergency["severity"]
units = []
if incident_type in ["fire", "explosion"]:
units.extend(["消防車2台", "救急車1台"])
if severity == "high":
units.extend(["特殊救助車", "指揮車"])
elif incident_type in ["accident", "traffic"]:
units.extend(["パトカー1台", "救急車1台"])
if severity == "high":
units.append("交通機動隊")
elif incident_type == "medical":
units.append("救急車1台")
if severity == "high":
units.extend(["ドクターカー", "消防車"])
return units
def _calculate_response_time(self, emergency: Dict[str, Any]) -> str:
"""対応時間の計算"""
severity_times = {
"low": "15-20分",
"medium": "8-12分",
"high": "3-5分",
"critical": "1-3分"
}
return severity_times.get(emergency["severity"], "10-15分")
def _generate_immediate_actions(self, emergency: Dict[str, Any]) -> List[str]:
"""即座に実行すべきアクション"""
actions = [
f"対応ユニット {', '.join(emergency['response_units'])} に出動指示",
f"現場 {emergency['location']} の交通規制検討"
]
if emergency["severity"] in ["high", "critical"]:
actions.extend([
"近隣住民への避難指示準備",
"医療機関へのアラート送信",
"メディア対応準備"
])
return actions
# サーバー起動
async def start_city_server():
server = CityDataServer()
await server.start_server("localhost", 8768)
if __name__ == "__main__":
asyncio.run(start_city_server())
🔌 MCPクライアント実装
統一されたMCPクライアント
# src/mcp/client.py
class MCPClient:
"""MCP統一クライアント"""
def __init__(self):
self.connections = {}
self.server_configs = {
"customer_database": {
"url": "ws://localhost:8767",
"capabilities": ["resources", "tools"]
},
"city_management": {
"url": "ws://localhost:8768",
"capabilities": ["resources", "tools"]
},
"enterprise_systems": {
"url": "ws://localhost:8769",
"capabilities": ["resources", "tools"]
}
}
self.message_id_counter = 0
async def connect_to_server(self, server_name: str):
"""MCPサーバーへの接続"""
if server_name not in self.server_configs:
raise ValueError(f"Unknown server: {server_name}")
config = self.server_configs[server_name]
try:
websocket = await websockets.connect(config["url"])
# 初期化メッセージ送信
init_message = MCPMessage(
id=self._get_next_id(),
method="initialize",
params={
"protocolVersion": "2024-11-05",
"clientInfo": {
"name": "MultiAgent MCP Client",
"version": "1.0.0"
},
"capabilities": {
"resources": {"subscribe": True},
"tools": {}
}
}
)
await websocket.send(json.dumps(init_message.dict(exclude_none=True)))
# 応答を待機
response = await websocket.recv()
response_data = json.loads(response)
if "error" in response_data:
raise Exception(f"Initialization failed: {response_data['error']}")
self.connections[server_name] = websocket
print(f"Connected to MCP server: {server_name}")
return True
except Exception as e:
print(f"Failed to connect to {server_name}: {e}")
return False
async def query_data_source(self, source: str, action: str,
parameters: Dict[str, Any] = None) -> Dict[str, Any]:
"""データソースへのクエリ"""
server_name = self._get_server_for_source(source)
if server_name not in self.connections:
await self.connect_to_server(server_name)
websocket = self.connections[server_name]
# アクションに応じてMCPメソッドを決定
if action.startswith("get_") or action.startswith("search_"):
method = "tools/call"
params = {
"name": action,
"arguments": parameters or {}
}
else:
method = "tools/call"
params = {
"name": action,
"arguments": parameters or {}
}
# メッセージ送信
message = MCPMessage(
id=self._get_next_id(),
method=method,
params=params
)
await websocket.send(json.dumps(message.dict(exclude_none=True)))
# 応答受信
response = await websocket.recv()
response_data = json.loads(response)
if "error" in response_data:
raise Exception(f"MCP query failed: {response_data['error']}")
return response_data.get("result", {})
async def list_available_resources(self, server_name: str) -> List[Dict[str, Any]]:
"""利用可能なリソース一覧取得"""
if server_name not in self.connections:
await self.connect_to_server(server_name)
websocket = self.connections[server_name]
message = MCPMessage(
id=self._get_next_id(),
method="resources/list"
)
await websocket.send(json.dumps(message.dict(exclude_none=True)))
response = await websocket.recv()
response_data = json.loads(response)
return response_data.get("result", {}).get("resources", [])
async def list_available_tools(self, server_name: str) -> List[Dict[str, Any]]:
"""利用可能なツール一覧取得"""
if server_name not in self.connections:
await self.connect_to_server(server_name)
websocket = self.connections[server_name]
message = MCPMessage(
id=self._get_next_id(),
method="tools/list"
)
await websocket.send(json.dumps(message.dict(exclude_none=True)))
response = await websocket.recv()
response_data = json.loads(response)
return response_data.get("result", {}).get("tools", [])
def _get_server_for_source(self, source: str) -> str:
"""データソースに対応するサーバーを特定"""
source_mapping = {
"customer_database": "customer_database",
"customer_profiles": "customer_database",
"support_tickets": "customer_database",
"traffic_data": "city_management",
"energy_data": "city_management",
"emergency_data": "city_management",
"hr_database": "enterprise_systems",
"finance_database": "enterprise_systems",
"project_database": "enterprise_systems"
}
return source_mapping.get(source, "customer_database")
def _get_next_id(self) -> int:
"""次のメッセージIDを取得"""
self.message_id_counter += 1
return self.message_id_counter
async def close_all_connections(self):
"""全接続を閉じる"""
for server_name, websocket in self.connections.items():
try:
await websocket.close()
print(f"Closed connection to {server_name}")
except:
pass
self.connections.clear()
# エージェントでの使用例
class EnhancedCustomerAgent(BaseAgent):
"""MCP対応カスタマーエージェント"""
def __init__(self):
super().__init__("customer_agent", "Enhanced Customer Agent")
self.mcp_client = MCPClient()
async def get_customer_info(self, customer_id: str) -> Dict[str, Any]:
"""MCP経由での顧客情報取得"""
try:
result = await self.mcp_client.query_data_source(
source="customer_database",
action="get_customer_profile",
parameters={"customer_id": customer_id}
)
return result
except Exception as e:
return {
"success": False,
"error": f"Failed to retrieve customer info: {e}"
}
async def search_customers(self, query: str, filters: Dict[str, Any] = None):
"""MCP経由での顧客検索"""
try:
result = await self.mcp_client.query_data_source(
source="customer_database",
action="search_customers",
parameters={"query": query, "filters": filters or {}}
)
return result
except Exception as e:
return {
"success": False,
"error": f"Customer search failed: {e}"
}
🔧 MCPの利点と実用性
1. 統一されたデータアクセス
従来:
# 各エージェントが個別実装
customer_db = CustomerDatabase(host, user, password)
traffic_api = TrafficAPI(api_key, endpoint)
hr_system = HRSystem(oauth_token)
MCP使用:
# すべて統一インターフェース
mcp_client = MCPClient()
customer_data = await mcp_client.query_data_source("customer_database", "get_customer", params)
traffic_data = await mcp_client.query_data_source("traffic_data", "get_status", params)
hr_data = await mcp_client.query_data_source("hr_database", "get_employee", params)
2. エラーハンドリングの統一
class MCPErrorHandler:
"""MCP統一エラーハンドリング"""
@staticmethod
async def safe_query(mcp_client: MCPClient, source: str,
action: str, parameters: Dict[str, Any],
fallback_action: Callable = None) -> Dict[str, Any]:
"""安全なMCPクエリ実行"""
try:
result = await mcp_client.query_data_source(source, action, parameters)
return result
except ConnectionError:
logger.warning(f"Connection failed to {source}, retrying...")
await asyncio.sleep(1)
try:
return await mcp_client.query_data_source(source, action, parameters)
except:
if fallback_action:
return await fallback_action(parameters)
return {"success": False, "error": "Connection failed"}
except Exception as e:
logger.error(f"MCP query failed: {e}")
return {"success": False, "error": str(e)}
3. 設定の一元管理
# config/mcp_config.yaml
mcp_servers:
customer_database:
url: "ws://localhost:8767"
retry_attempts: 3
timeout: 30
capabilities: ["resources", "tools"]
city_management:
url: "ws://localhost:8768"
retry_attempts: 3
timeout: 30
capabilities: ["resources", "tools"]
enterprise_systems:
url: "ws://localhost:8769"
retry_attempts: 3
timeout: 30
capabilities: ["resources", "tools"]
# 設定読み込み
class MCPConfigManager:
@staticmethod
def load_config(config_path: str) -> Dict[str, Any]:
with open(config_path, 'r') as file:
return yaml.safe_load(file)
🎯 次回予告
次回の実践編 では、実際にシステムを動かしながら:
- 開発環境のセットアップ方法
- 実際の動作シーンの詳細解説
- パフォーマンス測定と最適化ポイント
- トラブルシューティングの実践方法
Model Context Protocolによって、マルチエージェントシステムのデータアクセス層が大幅に統一化・効率化されることを学びました。次回は、実際にシステムを動かして体験していきましょう。
📚 参考リンク
#MCP #ModelContextProtocol #WebSocket #DataIntegration #API #Python #MultiAgent