0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Azure OpenAI Simple MCP over SSE Test

Last updated at Posted at 2025-04-23

HTTP with Server-Sent Events (SSE)

シンプルなリモート SSE 動作検証用。ダウンロードして即実行できるコードセット。

SSE (Server-Sent Events) MCP サーバーはリモートで実行され、HTTP エンドポイント経由で機能を提供する。stdio サーバーとは異なり、アプリケーションのサブプロセスとして実行されるものではない。

準備

pip install "mcp[cli]" "uvicorn"

1. 超シンプル実装

1.1. MCPサーバー

from mcp.server.fastmcp import FastMCP
from fastapi import FastAPI
import random
from pydantic import Field
# MCP サーバーのインスタンスを作成
mcp = FastMCP("Travel Agent")

# Tool 1: おすすめの旅行先を提案
@mcp.tool()
def suggest_onsen_destination() -> str:
    """
    おすすめの温泉地をランダムに提案します。
    """
    destinations = [
        "草津温泉(群馬県)",
        "下呂温泉(岐阜県)",
        "道後温泉(愛媛県)",
        "別府温泉(大分県)",
        "有馬温泉(兵庫県)",
        "黒川温泉(熊本県)",
        "城崎温泉(兵庫県)",
        "銀山温泉(山形県)",
        "熱海温泉(静岡県)",
        "登別温泉(北海道)"
    ]
    choice = random.choice(destinations)
    print(f"おすすめの温泉: {choice}")
    return choice

# Tool 2: 旅行予算の見積もり
@mcp.tool()
def estimate_travel_budget(
    days: int = Field(..., description="旅行の日数(例:3)"),
    people: int = Field(..., description="旅行者の人数(例:2)"),
    per_day_per_person: int = Field(15000, description="1人1日あたりの予算(円、デフォルトは15,000円)")
) -> int:
    """人数と日数から旅行の予算を見積もります。"""
    budget = days * people * per_day_per_person
    print(f"Estimating budget: {days}日 x {people}人 x {per_day_per_person}円 = {budget}")
    return budget

# Tool 3: 観光地のベストシーズンをチェック
@mcp.tool()
def check_best_season(
    destination: str = Field(..., description="観光地の名前(例:北海道、京都、沖縄など)")
) -> str:
    """指定された観光地のベストシーズンを返します。"""
    seasons = {
        "北海道": "6月〜8月(夏)",
        "京都": "4月(桜)・11月(紅葉)",
        "沖縄": "5月〜10月(海)",
        "金沢": "3月(雪解け)・10月(文化祭)",
    }
    best = seasons.get(destination, "ベストシーズン情報は見つかりませんでした。")
    print(f"{destination} のベストシーズン: {best}")
    return best

# FastAPI アプリケーションを作成し、MCP の SSE エンドポイントをマウント
app = FastAPI()
app.mount("/", mcp.sse_app())

# サーバーを起動するには、以下のコマンドを実行してください:
# uvicorn server:app --host 127.0.0.1 --port 8000
uvicorn server:app --host 127.0.0.1 --port 8000

1.2. MCP クライアント

import asyncio
from mcp import ClientSession
from mcp.client.sse import sse_client

async def main():
    # MCP SSE サーバーの URL を指定
    server_url = "http://localhost:8000/sse"

    # SSE クライアントを作成して接続
    async with sse_client(server_url) as (read, write):
        async with ClientSession(read, write) as session:
            # セッションを初期化
            await session.initialize()

            # 利用可能なツールを取得
            tools = await session.list_tools()
            print("利用可能なツール:", [tool.name for tool in tools.tools])

            # おすすめの旅行先を提案するツールを呼び出す
            result = await session.call_tool("suggest_onsen_destination")

            # 結果を表示
            print(f"おすすめの旅行先は{result.content[0].text}です。")

            # おすすめの旅行先を提案するツールを呼び出す
            result = await session.call_tool("estimate_travel_budget",{ "days":3, "people":2, "per_day_per_person":15000})

            # 結果を表示
            print(f"旅行予算は{result.content[0].text}円です。")

            # 観光地のベストシーズンをチェックするツールを呼び出す
            result = await session.call_tool("check_best_season", {"destination":"北海道"})
            # 結果を表示
            print(f"北海道のベストシーズンは{result.content[0].text}です。")

if __name__ == "__main__":
    asyncio.run(main())
python client.py

2. OpenAI Agent SDK 統合クライアント

Agent の mcp_servers パラメータにセットするだけ。

pip install openai-agents python-dotenv
import os
import asyncio
from dotenv import load_dotenv
from typing import Any
from openai import AsyncAzureOpenAI
from agents import Agent, Runner, gen_trace_id, trace, set_default_openai_client
from agents import OpenAIChatCompletionsModel, set_tracing_disabled, set_tracing_export_api_key
from agents.mcp import MCPServer, MCPServerSse
from agents.model_settings import ModelSettings

# 環境変数と接続設定の読み込み
load_dotenv()

# Azure OpenAIのAPIキーとエンドポイントを環境変数から取得
api_version = "2025-03-01-preview"
endpoint = os.getenv("AZURE_OPENAI_ENDPOINT")
apikey = os.getenv("AZURE_OPENAI_API_KEY")
model_deployment = os.getenv("AZURE_OPENAI_MODEL_DEPLOYMENT")

openai_client = AsyncAzureOpenAI(
    api_key=apikey,
    api_version=api_version,
    azure_endpoint=endpoint,
)
# Set the default OpenAI client for the Agents SDK
set_default_openai_client(openai_client, use_for_tracing=False)
set_tracing_disabled(disabled=True) # Azure OpenAI 401 Error 回避
#set_tracing_export_api_key("") # もし使用する場合「OpenAIのAPIキー」が必要

async def run(mcp_server: MCPServer):

    agent = Agent(
        name="TravelAgent",
        instructions="あなたは優秀なトラベルエージェントです。ツールを使って旅行の計画を手伝ってください。",
        mcp_servers=[mcp_server],
        model=OpenAIChatCompletionsModel( 
            model=model_deployment,
            openai_client=openai_client
        )
        #model_settings=ModelSettings(tool_choice="required"),
    )

    # Run the `get_secret_word` tool
    message = "おすすめの旅行先を提案して。"
    print(f"\n\nRunning: {message}")
    result = await Runner.run(starting_agent=agent, input=message)
    print(result.final_output)

    message = "2泊3日旅行での1人1日あたり2万円使う場合の旅行予算を見積もって。"
    print(f"\n\nRunning: {message}")
    result = await Runner.run(starting_agent=agent, input=message)
    print(result.final_output)

    message = "北海道のベストシーズンはいつ?"
    print(f"\n\nRunning: {message}")
    result = await Runner.run(starting_agent=agent, input=message)
    print(result.final_output)

async def main():
    async with MCPServerSse(
        name="SSE Python Server",
        params={
            "url": "http://localhost:8000/sse",
        },
    ) as server:
        await run(server)


if __name__ == "__main__":
    asyncio.run(main())

3. AutoGen v0.4.x 統合クライアント

MCP クライアントは v0.4.6 以降で実装され、AgentChat API で簡単に実装できる。Distributed Agent の構想は MCP で容易に実装できるように。

スクリーンショット 2025-04-23 21.50.13.png

pip install "autogen-agentchat==0.4.9" "autogen-ext[mcp,openai,azure]==0.4.9"

3.1. AssistantAgent 実装

import os
import asyncio
from dotenv import load_dotenv
from autogen_ext.tools.mcp import SseServerParams, mcp_server_tools
from autogen_agentchat.agents import AssistantAgent
from autogen_ext.models.openai import OpenAIChatCompletionClient, AzureOpenAIChatCompletionClient

async def main() -> None:
    load_dotenv()
    endpoint = os.getenv("AZURE_OPENAI_ENDPOINT")
    apikey = os.getenv("AZURE_OPENAI_API_KEY")
    model_deployment = os.getenv("AZURE_OPENAI_MODEL_DEPLOYMENT")

    # Setup server params for remote service
    server_params = SseServerParams(url="http://localhost:8000/sse")

    # Get all available tools
    tools = await mcp_server_tools(server_params)

    client = AzureOpenAIChatCompletionClient(
        azure_deployment=model_deployment,
        model="gpt-4o",
        api_key=apikey,
        api_version="2025-03-01-preview",
        azure_endpoint=endpoint,
    )

    # Create an agent with all tools
    agent = AssistantAgent(
        name="TravelAgent",
        model_client=client,
        tools=tools,
        system_message="あなたは優秀なトラベルエージェントです。ツールを使って旅行の計画を手伝ってください。",
    )
    
    # Run AssistantAgent and call the tool.
    result = await agent.run(task="おすすめの旅行先を提案して。")
    print(result.messages[-1].content)
    result = await agent.run(task="2泊3日旅行での1人1日あたり2万円使う場合の旅行予算を見積もって。")
    print(result.messages[-1].content)
    result = await agent.run(task="北海道のベストシーズンはいつ?")
    print(result.messages[-1].content)


if __name__ == "__main__":
    asyncio.run(main())

3.2. Selector Group Chat 実装

import os
from dotenv import load_dotenv
from autogen_ext.tools.mcp import SseServerParams, mcp_server_tools
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.conditions import MaxMessageTermination, TextMentionTermination, TimeoutTermination
from autogen_agentchat.teams import SelectorGroupChat
from autogen_agentchat.ui import Console
from autogen_ext.models.openai import OpenAIChatCompletionClient, AzureOpenAIChatCompletionClient

load_dotenv()

endpoint = os.getenv("AZURE_OPENAI_ENDPOINT")
apikey = os.getenv("AZURE_OPENAI_API_KEY")
model_deployment = os.getenv("AZURE_OPENAI_MODEL_DEPLOYMENT")


# Setup server params for remote service
server_params = SseServerParams(url="http://localhost:8000/sse")

# Get all available tools
tools = await mcp_server_tools(server_params)

client = AzureOpenAIChatCompletionClient(
    azure_deployment=model_deployment,
    model="gpt-4o",
    api_key=apikey,
    api_version="2025-03-01-preview",
    azure_endpoint=endpoint,
)

# Define termination condition
max_msg_termination = MaxMessageTermination(max_messages=10)
text_termination = TextMentionTermination("TERMINATE")
time_terminarion = TimeoutTermination(120)
combined_termination = max_msg_termination | text_termination | time_terminarion

planning_agent = AssistantAgent(
    "PlanningAgent",
    description="タスクを計画するエージェント。新しいタスクが与えられたときに最初に起動するエージェントであるべきである。",
    model_client=client,
    system_message="""
    あなたはplanning agentです。
    あなたの仕事は、複雑なタスクをより小さく、管理しやすいサブタスクに分解することです。
    あなたのチームメンバーは次の通りです。
     - travel_agent: 日本の観光の専門家
    あなたは計画を立て、タスクを委任するだけで、自分で実行することはありません。

    タスクを割り当てる際には、このフォーマットを使用してください:
    1. <agent> : <task>

    最終回答が完成したら調査結果を要約し、文の最後に TERMINATE を含めること!
    """,
)
# Create an agent with all tools
travel_agent = AssistantAgent(
    name="TravelAgent",
    model_client=client,
    tools=tools,
    system_message="あなたは優秀なトラベルエージェントです。ツールを使って旅行の計画を手伝ってください。",
)

team = SelectorGroupChat(
    [planning_agent, travel_agent],
    model_client=client,
    termination_condition=combined_termination,
)

task = "なんかいい旅行先ない?"

# Use asyncio.run(...) if you are running this in a script.
await Console(team.run_stream(task=task))

4. Azure AI Agent Service 統合クライアント

こんなアーキテクチャも構成可能に。

スクリーンショット 2025-04-23 21.52.42.png

azure_agent_mcp_server では以下のように Azure AI Foundry プロジェクトへの接続文字列とエージェント ID を指定するだけであとはクエリをラップしてくれる。

# Load environment variables
project_connection_string = os.getenv("PROJECT_CONNECTION_STRING")
default_agent_id = os.getenv("DEFAULT_AGENT_ID")

Security

実際には以下を考慮して実装すべし。

  • 受信 SSE 接続の Origin ヘッダーを常に検証し、期待されるソースからのものであることを確認する
  • ローカルで実行しているときは、サーバーをすべてのネットワークインターフェース(0.0.0.0)にバインドしないでください。代わりに、ローカルホスト(127.0.0.1)にのみバインドすること
  • すべての SSE 接続に適切な認証を実装する

GitHub

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?