1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

OpenAI AgentSDKでMCPホストなFastAPIサーバを作ってみた

Last updated at Posted at 2025-04-11

OpenAI AgentSDK では以下のような記載で簡単にMCPサーバをツールとして利用できます。

async with MCPServerStdio(
    params={
        "command": "npx",
        "args": ["-y", "@modelcontextprotocol/server-filesystem", samples_dir],
    }
) as mcp_server:

    agent=Agent(
    name="Assistant",
    instructions="Use the tools to achieve the task",
    mcp_servers=[mcp_server]
)

今回はチャットボットWebアプリ(FastAPI)を実装しつつ上記のMCP呼び出しを含めたMCPホストを作ってみた。という内容になります。
作成したものは以下のようなWebアプリです。

Claude Desktopとか、VS Codeとかと同じような機能イメージで結構がっつりとUI作ってますが、95%はGitHub Copilot(Agent mode)の成果物です。ありがとうGitHub Copilot 💪💪

image.png

MCPサーバを複数登録できるMCPHub

2025/4/11現在、OpenAI AgentSDK の公式ドキュメントには複数のMCPサーバを取り扱うサンプルなどがありません。async withを重ねて実現することもできますが、チャットリクエストの都度MCPサーバのプロセスを起動するのはさすがにオーバーヘッドが大きいということで、MCPサーバを登録できるHubを作ります。

MCPHubの実装以下です。MCPServerの接続や切断はasyncioでEventloopで適切に処理するようにしています。

MCPHub.py
import asyncio
from typing import Dict, Optional

import logging
from agents.mcp import MCPServer

# Configure logger
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class MCPHub:
    """
    MCPHub class for managing MCP server instances per user.
    Initialized when the FastAPI server starts and closed when it shuts down.
    """
    _instance = None

    @classmethod
    def get_instance(cls) -> 'MCPHub':
        """Get the singleton instance of MCPHub"""
        if cls._instance is None:
            cls._instance = MCPHub()
        return cls._instance

    def __init__(self):
        self._loop = None
        if MCPHub._instance is not None:
            logger.warning("MCPHub instance already exists. Use get_instance() to access it.")
            return
        logger.info("Initializing MCPHub")
        self.user_servers: Dict[str, Dict[str, MCPServer]] = {}
        self._action_queue = None  # 後でイベントループ上で初期化
        self._worker_task = None
        MCPHub._instance = self

    def set_event_loop(self, loop):
        self._loop = loop
        self._action_queue = asyncio.Queue()
        self._worker_task = asyncio.create_task(self._run())

    async def _run(self):
        while True:
            item = await self._action_queue.get()
            action = item[0]
            if action == "cleanup":
                _, email, tool_id = item
                server = self.get_server_for_user_tool(email, tool_id)
                if server:
                    try:
                        await server.cleanup()
                    except Exception as e:
                        logger.error(f"Cleanup error: {e}")
                    self.remove_server_for_user_tool(email, tool_id)
            elif action == "connect":
                _, email, tool_id, server_instance, future = item
                try:
                    await server_instance.connect()
                    logger.info(f"Connected and registered MCP server for user: {email}, tool: {tool_id}")
                    future.set_result(server_instance)
                except Exception as e:
                    logger.error(f"Connect error: {e}")
                    future.set_exception(e)

    def shutdown(self):
        if self._worker_task:
            self._worker_task.cancel()

    async def close(self):
        logger.info("Closing MCPHub and all MCP server instances")
        for email, tool_servers in self.user_servers.items():
            for tool_id, server in tool_servers.items():
                try:
                    await server.cleanup()
                except (GeneratorExit, RuntimeError) as e:
                    logger.warning(f"Cleanup error for {email}:{tool_id}: {e}")
                except Exception as e:
                    logger.error(f"Unexpected cleanup error for {email}:{tool_id}: {e}")
        self.user_servers.clear()
        logger.info("MCPHub closed")

    def schedule_cleanup(self, email: str, tool_id: str):
        self._action_queue.put_nowait(("cleanup", email, tool_id))

    async def cleanup_server(self, email: str, tool_id: str) -> bool:
        self.schedule_cleanup(email, tool_id)
        return True

    async def add_server_for_user_tool(self, email: str, tool_id: str, server_instance: MCPServer) -> MCPServer:
        if self._loop is None:
            self._loop = asyncio.get_running_loop()
        future = self._loop.create_future()
        if email not in self.user_servers:
            self.user_servers[email] = {}
        self.user_servers[email][tool_id] = server_instance
        self._action_queue.put_nowait(("connect", email, tool_id, server_instance, future))
        logger.info(f"Enqueued connect for user: {email}, tool: {tool_id}")
        try:
            result = await future
            return result
        except Exception as e:
            if email in self.user_servers and tool_id in self.user_servers[email]:
                del self.user_servers[email][tool_id]
                if not self.user_servers[email]:
                    del self.user_servers[email]
            raise e

    def get_servers_for_user(self, email: str) -> Dict[str, MCPServer]:
        return self.user_servers.get(email, {})

    def get_server_for_user_tool(self, email: str, tool_id: str) -> Optional[MCPServer]:
        user_tools = self.user_servers.get(email, {})
        return user_tools.get(tool_id)

    def remove_server_for_user_tool(self, email: str, tool_id: str) -> bool:
        if email in self.user_servers:
            user_tools = self.user_servers[email]
            if tool_id in user_tools:
                logger.info(f"Removing MCP server for user: {email}, tool: {tool_id}")
                del user_tools[tool_id]
                if not user_tools:
                    del self.user_servers[email]
                return True
        logger.warning(f"No MCP server exists for user: {email}, tool: {tool_id}")
        return False

その上で、以下のようにAgentを実行します。以下はストリームではないですが、ストリームの場合はRunner.run_streamedで実行して、event.data.deltaを扱えばOKです。

async def call_openai_model(usermail, modelname, chatmodel_title, chatmodel_instructions, chatmodel_tools, messages):

    set_client(modelname)

    mcp_hub = MCPHub.get_instance()
    mcp_servers_dict = mcp_hub.get_servers_for_user(usermail)

    mcp_servers = mcp_servers_dict.values()

    agent = Agent(
        name=chatmodel_title,
        instructions=chatmodel_instructions,
        mcp_servers=mcp_servers,
    )
    
    result = await Runner.run(agent, messages)
    return result.final_output

チャット履歴だとか、MCPの設定を登録する部分についてはGitHub Copilotとあれやこれやすればできると思うので、ここでは書きません。

できたもの

MCPサーバを複数設定で管理して、それぞれ個別に再生ボタンで起動し、チャットリクエストに対してMCPサーバを適宜呼び出して回答するチャットボットが完成しました。

下図はマイクロソフトのミッションを返す独自MCPサーバを起動して、ミッションを確認しているところです。関西弁なのは選択した標準モデルのプロンプトに”関西弁で回答してください”という指示があるからになります。

image.png

課題と考察 ~WebアプリMCPホストに思うところ

世の中の生成AIがらみのチャットボットは、ほぼほぼWebアプリだと思います。そこにMCP追加機能なんかがあると便利かなと思って実装してみましたが色々課題が見えてきました。

  • Stdio形式の場合サーバ内部にMCPサーバプロセスがどんどん増える
  • SSE形式の場合サーバからリモートMCPサーバに接続が貼りっぱなしになる
  • 利用するSDKによって上記のプロセスを正しく削除する方法
  • サーバからローカルファイルへのアクセスができないので、ローカルファイル操作系のMCPが使えない

リモートMCPサーバを前提として、ユーザには任意のMCPサーバを登録させない、なんかも方法としてはあるかもしれませんが、その場合は

  • 独自のMCPサーバの実装は、Function Callingでの実装との差別化が難しくなる
  • Azure App Serviceの場合はSSE接続が240/230秒でタイムアウトしてしまう

といった課題もでてくるかなと。まぁここは、いずれStreamable HTTPの実装に変わるでしょうし、そのタイミングでまたあれこれいじって評価してみたいなと思います。

やはりMCPホストはWebではなくてネイティブアプリが必要かしら。と思いつつも、やはりWebアプリでリモートMCPサーバを簡単に使えるのは色々とメリットがあると思いますので、平行してユースケースについて整理してみようかなと思ったところで本記事はおしまいです。

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?