HTTP with Server-Sent Events (SSE)
シンプルなリモート SSE 動作検証用。ダウンロードして即実行できるコードセット。
SSE (Server-Sent Events) MCP サーバーはリモートで実行され、HTTP エンドポイント経由で機能を提供する。stdio
サーバーとは異なり、アプリケーションのサブプロセスとして実行されるものではない。
準備
pip install "mcp[cli]" "uvicorn"
1. 超シンプル実装
1.1. MCPサーバー
from mcp.server.fastmcp import FastMCP
from fastapi import FastAPI
import random
from pydantic import Field
# MCP サーバーのインスタンスを作成
mcp = FastMCP("Travel Agent")
# Tool 1: おすすめの旅行先を提案
@mcp.tool()
def suggest_onsen_destination() -> str:
"""
おすすめの温泉地をランダムに提案します。
"""
destinations = [
"草津温泉(群馬県)",
"下呂温泉(岐阜県)",
"道後温泉(愛媛県)",
"別府温泉(大分県)",
"有馬温泉(兵庫県)",
"黒川温泉(熊本県)",
"城崎温泉(兵庫県)",
"銀山温泉(山形県)",
"熱海温泉(静岡県)",
"登別温泉(北海道)"
]
choice = random.choice(destinations)
print(f"おすすめの温泉: {choice}")
return choice
# Tool 2: 旅行予算の見積もり
@mcp.tool()
def estimate_travel_budget(
days: int = Field(..., description="旅行の日数(例:3)"),
people: int = Field(..., description="旅行者の人数(例:2)"),
per_day_per_person: int = Field(15000, description="1人1日あたりの予算(円、デフォルトは15,000円)")
) -> int:
"""人数と日数から旅行の予算を見積もります。"""
budget = days * people * per_day_per_person
print(f"Estimating budget: {days}日 x {people}人 x {per_day_per_person}円 = {budget}円")
return budget
# Tool 3: 観光地のベストシーズンをチェック
@mcp.tool()
def check_best_season(
destination: str = Field(..., description="観光地の名前(例:北海道、京都、沖縄など)")
) -> str:
"""指定された観光地のベストシーズンを返します。"""
seasons = {
"北海道": "6月〜8月(夏)",
"京都": "4月(桜)・11月(紅葉)",
"沖縄": "5月〜10月(海)",
"金沢": "3月(雪解け)・10月(文化祭)",
}
best = seasons.get(destination, "ベストシーズン情報は見つかりませんでした。")
print(f"{destination} のベストシーズン: {best}")
return best
# FastAPI アプリケーションを作成し、MCP の SSE エンドポイントをマウント
app = FastAPI()
app.mount("/", mcp.sse_app())
# サーバーを起動するには、以下のコマンドを実行してください:
# uvicorn server:app --host 127.0.0.1 --port 8000
uvicorn server:app --host 127.0.0.1 --port 8000
1.2. MCP クライアント
import asyncio
from mcp import ClientSession
from mcp.client.sse import sse_client
async def main():
# MCP SSE サーバーの URL を指定
server_url = "http://localhost:8000/sse"
# SSE クライアントを作成して接続
async with sse_client(server_url) as (read, write):
async with ClientSession(read, write) as session:
# セッションを初期化
await session.initialize()
# 利用可能なツールを取得
tools = await session.list_tools()
print("利用可能なツール:", [tool.name for tool in tools.tools])
# おすすめの旅行先を提案するツールを呼び出す
result = await session.call_tool("suggest_onsen_destination")
# 結果を表示
print(f"おすすめの旅行先は{result.content[0].text}です。")
# おすすめの旅行先を提案するツールを呼び出す
result = await session.call_tool("estimate_travel_budget",{ "days":3, "people":2, "per_day_per_person":15000})
# 結果を表示
print(f"旅行予算は{result.content[0].text}円です。")
# 観光地のベストシーズンをチェックするツールを呼び出す
result = await session.call_tool("check_best_season", {"destination":"北海道"})
# 結果を表示
print(f"北海道のベストシーズンは{result.content[0].text}です。")
if __name__ == "__main__":
asyncio.run(main())
python client.py
2. OpenAI Agent SDK 統合クライアント
Agent の mcp_servers
パラメータにセットするだけ。
pip install openai-agents python-dotenv
import os
import asyncio
from dotenv import load_dotenv
from typing import Any
from openai import AsyncAzureOpenAI
from agents import Agent, Runner, gen_trace_id, trace, set_default_openai_client
from agents import OpenAIChatCompletionsModel, set_tracing_disabled, set_tracing_export_api_key
from agents.mcp import MCPServer, MCPServerSse
from agents.model_settings import ModelSettings
# 環境変数と接続設定の読み込み
load_dotenv()
# Azure OpenAIのAPIキーとエンドポイントを環境変数から取得
api_version = "2025-03-01-preview"
endpoint = os.getenv("AZURE_OPENAI_ENDPOINT")
apikey = os.getenv("AZURE_OPENAI_API_KEY")
model_deployment = os.getenv("AZURE_OPENAI_MODEL_DEPLOYMENT")
openai_client = AsyncAzureOpenAI(
api_key=apikey,
api_version=api_version,
azure_endpoint=endpoint,
)
# Set the default OpenAI client for the Agents SDK
set_default_openai_client(openai_client, use_for_tracing=False)
set_tracing_disabled(disabled=True) # Azure OpenAI 401 Error 回避
#set_tracing_export_api_key("") # もし使用する場合「OpenAIのAPIキー」が必要
async def run(mcp_server: MCPServer):
agent = Agent(
name="TravelAgent",
instructions="あなたは優秀なトラベルエージェントです。ツールを使って旅行の計画を手伝ってください。",
mcp_servers=[mcp_server],
model=OpenAIChatCompletionsModel(
model=model_deployment,
openai_client=openai_client
)
#model_settings=ModelSettings(tool_choice="required"),
)
# Run the `get_secret_word` tool
message = "おすすめの旅行先を提案して。"
print(f"\n\nRunning: {message}")
result = await Runner.run(starting_agent=agent, input=message)
print(result.final_output)
message = "2泊3日旅行での1人1日あたり2万円使う場合の旅行予算を見積もって。"
print(f"\n\nRunning: {message}")
result = await Runner.run(starting_agent=agent, input=message)
print(result.final_output)
message = "北海道のベストシーズンはいつ?"
print(f"\n\nRunning: {message}")
result = await Runner.run(starting_agent=agent, input=message)
print(result.final_output)
async def main():
async with MCPServerSse(
name="SSE Python Server",
params={
"url": "http://localhost:8000/sse",
},
) as server:
await run(server)
if __name__ == "__main__":
asyncio.run(main())
3. AutoGen v0.4.x 統合クライアント
MCP クライアントは v0.4.6 以降で実装され、AgentChat API で簡単に実装できる。Distributed Agent の構想は MCP で容易に実装できるように。
pip install "autogen-agentchat==0.4.9" "autogen-ext[mcp,openai,azure]==0.4.9"
3.1. AssistantAgent 実装
import os
import asyncio
from dotenv import load_dotenv
from autogen_ext.tools.mcp import SseServerParams, mcp_server_tools
from autogen_agentchat.agents import AssistantAgent
from autogen_ext.models.openai import OpenAIChatCompletionClient, AzureOpenAIChatCompletionClient
async def main() -> None:
load_dotenv()
endpoint = os.getenv("AZURE_OPENAI_ENDPOINT")
apikey = os.getenv("AZURE_OPENAI_API_KEY")
model_deployment = os.getenv("AZURE_OPENAI_MODEL_DEPLOYMENT")
# Setup server params for remote service
server_params = SseServerParams(url="http://localhost:8000/sse")
# Get all available tools
tools = await mcp_server_tools(server_params)
client = AzureOpenAIChatCompletionClient(
azure_deployment=model_deployment,
model="gpt-4o",
api_key=apikey,
api_version="2025-03-01-preview",
azure_endpoint=endpoint,
)
# Create an agent with all tools
agent = AssistantAgent(
name="TravelAgent",
model_client=client,
tools=tools,
system_message="あなたは優秀なトラベルエージェントです。ツールを使って旅行の計画を手伝ってください。",
)
# Run AssistantAgent and call the tool.
result = await agent.run(task="おすすめの旅行先を提案して。")
print(result.messages[-1].content)
result = await agent.run(task="2泊3日旅行での1人1日あたり2万円使う場合の旅行予算を見積もって。")
print(result.messages[-1].content)
result = await agent.run(task="北海道のベストシーズンはいつ?")
print(result.messages[-1].content)
if __name__ == "__main__":
asyncio.run(main())
3.2. Selector Group Chat 実装
import os
from dotenv import load_dotenv
from autogen_ext.tools.mcp import SseServerParams, mcp_server_tools
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.conditions import MaxMessageTermination, TextMentionTermination, TimeoutTermination
from autogen_agentchat.teams import SelectorGroupChat
from autogen_agentchat.ui import Console
from autogen_ext.models.openai import OpenAIChatCompletionClient, AzureOpenAIChatCompletionClient
load_dotenv()
endpoint = os.getenv("AZURE_OPENAI_ENDPOINT")
apikey = os.getenv("AZURE_OPENAI_API_KEY")
model_deployment = os.getenv("AZURE_OPENAI_MODEL_DEPLOYMENT")
# Setup server params for remote service
server_params = SseServerParams(url="http://localhost:8000/sse")
# Get all available tools
tools = await mcp_server_tools(server_params)
client = AzureOpenAIChatCompletionClient(
azure_deployment=model_deployment,
model="gpt-4o",
api_key=apikey,
api_version="2025-03-01-preview",
azure_endpoint=endpoint,
)
# Define termination condition
max_msg_termination = MaxMessageTermination(max_messages=10)
text_termination = TextMentionTermination("TERMINATE")
time_terminarion = TimeoutTermination(120)
combined_termination = max_msg_termination | text_termination | time_terminarion
planning_agent = AssistantAgent(
"PlanningAgent",
description="タスクを計画するエージェント。新しいタスクが与えられたときに最初に起動するエージェントであるべきである。",
model_client=client,
system_message="""
あなたはplanning agentです。
あなたの仕事は、複雑なタスクをより小さく、管理しやすいサブタスクに分解することです。
あなたのチームメンバーは次の通りです。
- travel_agent: 日本の観光の専門家
あなたは計画を立て、タスクを委任するだけで、自分で実行することはありません。
タスクを割り当てる際には、このフォーマットを使用してください:
1. <agent> : <task>
最終回答が完成したら調査結果を要約し、文の最後に TERMINATE を含めること!
""",
)
# Create an agent with all tools
travel_agent = AssistantAgent(
name="TravelAgent",
model_client=client,
tools=tools,
system_message="あなたは優秀なトラベルエージェントです。ツールを使って旅行の計画を手伝ってください。",
)
team = SelectorGroupChat(
[planning_agent, travel_agent],
model_client=client,
termination_condition=combined_termination,
)
task = "なんかいい旅行先ない?"
# Use asyncio.run(...) if you are running this in a script.
await Console(team.run_stream(task=task))
4. Azure AI Agent Service 統合クライアント
こんなアーキテクチャも構成可能に。
azure_agent_mcp_server
では以下のように Azure AI Foundry プロジェクトへの接続文字列とエージェント ID を指定するだけであとはクエリをラップしてくれる。
# Load environment variables
project_connection_string = os.getenv("PROJECT_CONNECTION_STRING")
default_agent_id = os.getenv("DEFAULT_AGENT_ID")
Security
実際には以下を考慮して実装すべし。
- 受信 SSE 接続の Origin ヘッダーを常に検証し、期待されるソースからのものであることを確認する
- ローカルで実行しているときは、サーバーをすべてのネットワークインターフェース(0.0.0.0)にバインドしないでください。代わりに、ローカルホスト(127.0.0.1)にのみバインドすること
- すべての SSE 接続に適切な認証を実装する
GitHub