10
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

MCPにAuth0を利用した認可機能を実装してみた

10
Last updated at Posted at 2025-12-17

この記事は、NTTテクノクロス Advent Calendar 2025 シリーズ2の18日目の記事になります。

皆さんこんにちは。NTTテクノクロスの西川です。
以前まではXR系の部署に所属していたのですが、今年度からセキュリティ系の部署に異動したため、心機一転してセキュリティに関連する記事を書いてみたいと思います。
セキュリティといってもセキュリティメインの話ではなく、最近LLM関連のキーワードとしてよく聞くようになっているModel Context Protocol (MCP) をメインにセキュアに運用するため必要となる認可機能を実装してみたという話をしたいと思います。

また本記事はOAuthを利用した認可処理を実装するため、「実装」を読む前にOAuthに関する基本的な知識があると、内容をより理解することができると思います。

MCP (Model Context Protocol)

MCPとは?

最近のLLMを利用することができるシステムとかだとMCPで連携することができますみたいな話をよく聞くようになっていると思いますが、そもそもMCPとは何でしょうか?
MCP自体はClaudeの開発で知られれているAnthropicが提唱しているLLMベースのAIアプリケーションが外部のリソースを簡単に利用するための通信規格みたいなものです。
(通信規格と言っていますが、MCP単体でRFCとかに登録されているわけではなく、まだまだ発展途上のものです。)
MCP自体の説明はMCPの公式ドキュメントでは、

Think of MCP like a USB-C port for AI applications. Just as USB-C provides a standardized way to connect electronic devices, MCP provides a standardized way to connect AI applications to external systems.
(機械翻訳)
MCPはAIアプリケーション用のUSB-Cポートのようなものだと考えてください。USB-Cが電子機器を接続するための標準化された方法を提供するのと同様に、MCPはAIアプリケーションを外部システムに接続するための標準化された方法を提供します。

と書かれており、これからAIアプリケーションに提供する外部機能を開発する際にはMCPに従って機能提供をすると簡単にAIアプリケーションへの容易な組み込みが可能になっていくものと思われます。
(記事執筆時点でも色々なAIアプリケーションが対応してきているので、デファクトスタンダードの立ち位置を獲得しつつあると思います。)

例えば、Visual Studio CodeにもMCPを用いてAIエージェントが外部のリソースにアクセスする手段が提供されており、
vscode_mcp.png

こんな感じで設定項目にも試験段階などではあるものの利用できる機能が増えつつあります。

MCPを利用するときに知っておきたい機能

ここでは、MCPを利用するにあたって知っておきたい情報をいくつか説明します。
まず、MCPはAIアプリケーション内で動作するMCPクライアントと外部リソースを提供するMCPサーバー間でプロトコルに従った情報のやり取りをすることで、AIアプリケーションとサーバー間で簡単にやり取りができるようになるというものです。

mcp_architecture.png
MCP公式ドキュメントから画像を引用)
そのため、AIアプリケーション側の開発者はクライアント側の仕様、AIアプリケーションに外部リソースを提供する開発者はサーバー側の仕様に従った実装を行うことで、市中に存在するMCP対応のアプリケーションとして利用/組み込みを行うことができるようになります。

また、MCPクライアント/サーバーが提供する機能としては、

  • MCPクライアント
    • Elicitation: サーバーからユーザーへの情報を要求
    • Roots:サーバー内での情報取得範囲の制限を付与
    • Sampling: サーバー側がユーザー側のLLM利用を要求
  • MCPサーバー
    • Tools: LLMにおけるツールの利用
    • Resources: LLMが利用できるデータの公開
    • Prompts: LLMが活用できるプロンプトのテンプレート公開

というようなものが用意されています。(2025-11-25時点の仕様
公式ドキュメントから確認できるAIアプリケーションにおける各機能への対応表を見てみると、この記事を執筆している時点(2025/12)では、MCPクライアントの機能に対応していないものが多く、MCPサーバー側の機能、特にTools機能はすべてのAIアプリケーションが対応していることが確認できます。

MCPが用いるデータの通信方式(Transports)としては、

  • Stdio transport(ローカルで立ち上げたプロセスとの標準入出力を利用した方式)
  • Streamable HTTP transport(HTTP通信を利用しリモートでのやりとりも可能な方式)

がデフォルトでサポートされています。
また実装しようと思えばCustom Transportsという形でWeb Socketなどで実装することも可能です。

MCPにおける認証/認可機能の対応

以前までのMCPサーバーを用いた外部リソースへのアクセスなどは基本的に

  • ローカルに記述したCredentialsなど
  • Stdio transportsで起動したローカルのプロセス

を利用することで外部のリソースにアクセスしていました。

ですが、実際にMCPサーバーを開発して幅広く利用してもらうためには、

  • Streamable HTTP transportsで立ち上げておいたMCPサーバー
  • OAuth/OIDCなどを用いた認可/認証による適切なアクセス権限の付与

によって実装されることが望ましいです。

一応、記事執筆時点における最新の仕様にはAuthorization(認可)に関する仕様が定まっており、OAuth 2.0/2.1を利用した適切なアクセス権限による利用が可能となっています。
(最近だと、2025/06/12GithubはOAuth 2.0 を利用したリモートのMCPサーバーアクセスを実現していたりします。)

そこで本記事では、実際にPython SDK for MCPAuth0を利用して適切なアクセス権限が付与されたユーザーのみに利用可能なMCPサーバーの実装をしていきたいと思います。

実装

今回の実装においては、ある程度無料で利用可能な認可サービスとして「Auth0」を、MCPクライアント/サーバーの実装にはPythonに慣れていることもあって、Python SDK for MCPを利用して実装してみたいと思います。

Auth0側の設定

まず初めに認可アプリケーションであるAuth0の設定を実施します。
そもそもAuth0とは、アプリケーションやWebサービス向けの認証/認可機能を簡単に導入/提供するためのクラウドサービスであり、無料からでも利用が可能です。

アプリケーションの作成

Auth0にログインしたら、まずはアプリケーションを作成します。
アプリケーション名は好きな名前にしてもらい、アプリケーションの種類を「一般的なWebアプリケーション」に設定し作成します。
create_application_auth0.png

次に作成したアプリケーションの「設定 → 基本情報」から、ドメインとクライアントID、クライアントシークレットを確認し、記録しておきます。ここで記録した値はMCPクライアントの実装で利用します。
application_base_settings.png

そのまま下にスクロールし、「アプリケーションのURI」から

というように設定します。

今回はPOCのような検証環境を想定しているため、暗号化しないHTTPプロトコルによるコールバックURLを設定しましたが、本番環境では適宜HTTPSプロトコルによるセキュアな通信などで読み替えて実装してください。

application_uri_settings.png

APIの作成

次に左側の選択肢から「アプリケーション → API」と選択しAPIを作成します。
この時に設定する識別子(identifier)は後から変更できない上に、MCPクライアントの実装で利用するため注意してください。
create_api_auth0.png

APIの設定における「RBACの設定」から「RBACを有効化」と「アクセストークンに権限を追加する」という項目をONにします。この操作を実施することで特定のユーザーのみにアクセス権限を付与することができるようになります。
api_rbac_settings.png
また、「アクセストークンの設定」が以下のようになっていることを確認し、JWTの署名アルゴリズムをプログラムのために記録しておきます。
api_token_settings.png

次の設定は「パーミッション」から「パーミッションを追加する」という項目を探し、以下のように設定します。
ここで設定したパーミッションはMCPサーバーに対するアクセス権限の確認で利用するため注意してください。

  • パーミッション: access:mcp
  • 説明: 基本的なMCPアクセス権限
    api_permission_settings.png

ユーザー管理の設定

次に実際にアクセス権限を付与するユーザーを設定します。
まずは「ユーザー管理 → ロール」に移動、「+ ロールを作成」を押下し、ロールを作成します。
作成したロールには「パーミッション」の「パーミッションを追加」を押下し、以下のようにして先ほど作成したAPIと紐づける形でアクセス権限を付与します。
user_role_settings.png

その次は「ユーザー管理 → ユーザー」から「+ ユーザーを作成」を押下し、メールアドレスとパスワードに適当な値を設定しユーザーを作成します。
作成したユーザーには「ロール」か「ロールの割り当て」を選択し、先ほど作成したロールを割り当てます。
auth0_user.png

ここまでの操作でAuth0側の設定は完了です。

AIアプリケーションの実装

ここからはPythonを利用したMCPクライアント/サーバーの実装を行います。
ソースコードはGithubに公開しているので気になった方は参照してみてください。
また実装したソースコードは、Python SDK for MCPで紹介されているサンプルプログラムを基に一部実装をいたしました。

筆者の実装環境は以下のようになっています。
注意点としては、mcpライブラリはまだまだ発展途上のためバージョンを筆者と合わせるかなるべく新しいバージョンを利用してください。
筆者は少し古いバージョンを利用したときにライブラリ側のバグでOAuthにおける認可トークンの受け取り先URLが想定していないものになったりしました。

  • パッケージ管理: uv
  • Python バージョン: 3.13
  • 利用パッケージとバージョン:
    • langchain==1.1.2
    • langchain-google-genai==4.0.0
    • langchain-mcp-adapters==0.1.14
    • mcp[cli]==1.23.1
    • pyjwt==2.10.1
    • selenium==4.38.0

.envの設定

まずはAuth0の設定で記録した値を基に.envファイルの設定を実施します。
AUTH0_AUDIENCEにはAPIで設定した識別子を設定します。

.env
# Auth0 Configuration
# Get these values from your Auth0 dashboard: https://manage.auth0.com/
AUTH0_DOMAIN=dev-xxxx.xx.auth0.com
AUTH0_CLIENT_ID=xxxxxxx
AUTH0_CLIENT_SECRET=xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
AUTH0_AUDIENCE=xxxxxxxx
JWT_ALGORITHM=RS256

# MCP Server Configuration
MCP_HOST=localhost
MCP_PORT=5555

# These are defined in your Auth0 API settings
REQUIRED_PERMISSION=access:mcp

# MCP Client Configuration (for oauth_client.py)
# Callback URI must match the one configured in Auth0
OAUTH_CALLBACK_URI=http://localhost:5000/callback
OAUTH_CALLBACK_PORT=5000

GEMINI_API_KEY=xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx

今回はMCPサーバーのツール呼び出しにGeminiを利用したエージェントによる自動呼出しを実装したいと思っているため、GEMINI_API_KEYGoogle AI Studioで取得したAPIキーをセットします。

MCPクライアント

次にMCPクライアントを実装します。

ソースコード全文
mcp_client.py
import asyncio
from urllib.parse import parse_qs, urlparse, urlunparse, urlencode
import os
import time
from dotenv import load_dotenv
import logging
import webbrowser
from http.server import HTTPServer, BaseHTTPRequestHandler
import threading
import httpx
from mcp.client.auth import OAuthFlowError
from selenium import webdriver
from selenium.webdriver.chrome.options import Options as ChromeOptions
from selenium.webdriver.firefox.options import Options as FirefoxOptions
from selenium.webdriver.edge.options import Options as EdgeOptions
from selenium.common.exceptions import WebDriverException

from typing import Any

from pydantic import AnyUrl

from mcp import ClientSession
from mcp.client.auth import OAuthClientProvider, TokenStorage
from mcp.client.streamable_http import streamablehttp_client
from mcp.shared.auth import OAuthClientInformationFull, OAuthClientMetadata, OAuthToken
from langchain_mcp_adapters.client import MultiServerMCPClient  
from langchain.agents import create_agent
from langchain_google_genai import ChatGoogleGenerativeAI

# Setup logging
logging.basicConfig(
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
# logger.setLevel(logging.DEBUG)

# Load environment variables
load_dotenv()

AUTH0_DOMAIN=os.environ["AUTH0_DOMAIN"]
AUTH0_CLIENT_ID=os.environ["AUTH0_CLIENT_ID"]
AUTH0_CLIENT_SECRET=os.environ["AUTH0_CLIENT_SECRET"]
AUTH0_AUDIENCE = os.environ["AUTH0_AUDIENCE"]

MCP_HOST=os.environ["MCP_HOST"]
MCP_PORT=int(os.environ["MCP_PORT"])

REQUIRED_PERMISSION=os.environ["REQUIRED_PERMISSION"]
OAUTH_CALLBACK_URI=os.environ["OAUTH_CALLBACK_URI"]
OAUTH_CALLBACK_PORT=int(os.environ["OAUTH_CALLBACK_PORT"])

def open_browser_incognito(url: str) -> bool:
    """
    Open URL in incognito/private browsing mode using Selenium.
    Tries multiple browsers in order: Chrome, Firefox, Edge.
    Falls back to normal mode if incognito mode fails.

    Returns:
        bool: True if successfully opened, False otherwise
    """
    # Try Chrome with incognito mode
    try:
        chrome_options = ChromeOptions()
        chrome_options.add_argument("--incognito")
        driver = webdriver.Chrome(options=chrome_options)
        driver.get(url)
        logger.info("Opened URL in Chrome incognito mode")
        return True
    except WebDriverException as e:
        logger.debug(f"Failed to open Chrome: {e}")
    except Exception as e:
        logger.debug(f"Failed to open Chrome: {e}")

    # Try Firefox with private browsing mode
    try:
        firefox_options = FirefoxOptions()
        firefox_options.add_argument("-private")
        driver = webdriver.Firefox(options=firefox_options)
        driver.get(url)
        logger.info("Opened URL in Firefox private mode")
        return True
    except WebDriverException as e:
        logger.debug(f"Failed to open Firefox: {e}")
    except Exception as e:
        logger.debug(f"Failed to open Firefox: {e}")

    # Try Edge with InPrivate mode
    try:
        edge_options = EdgeOptions()
        edge_options.add_argument("--inprivate")
        driver = webdriver.Edge(options=edge_options)
        driver.get(url)
        logger.info("Opened URL in Edge InPrivate mode")
        return True
    except WebDriverException as e:
        logger.debug(f"Failed to open Edge: {e}")
    except Exception as e:
        logger.debug(f"Failed to open Edge: {e}")

    # Fall back to default browser (normal mode)
    logger.warning("Could not open in incognito mode, falling back to normal browser")
    try:
        webbrowser.open(url)
        return True
    except Exception as e:
        logger.error(f"Failed to open browser: {e}")
        return False


class InMemoryTokenStorage(TokenStorage):
    """Demo In-memory token storage implementation."""

    def __init__(self):
        self.tokens: OAuthToken | None = None
        self.client_info: OAuthClientInformationFull | None = None

    async def get_tokens(self) -> OAuthToken | None:
        """Get stored tokens."""
        return self.tokens

    async def set_tokens(self, tokens: OAuthToken) -> None:
        """Store tokens."""
        time.sleep(1)  # iat delay to ensure token consistency
        self.tokens = tokens

    async def get_client_info(self) -> OAuthClientInformationFull | None:
        """Get stored client information."""
        return self.client_info

    async def set_client_info(self, client_info: OAuthClientInformationFull) -> None:
        """Store client information."""
        self.client_info = client_info


class OAuthCallbackHandler:
    """OAuth callback handler that opens browser and captures the callback."""

    def __init__(self, audience: str):
        self.auth_url: str = ""
        self.auth_code: str | None = None
        self.state: str | None = None
        self.error: str | None = None
        self.callback_received = threading.Event()
        self.audience: str = audience

        # Create and start HTTP server in a separate thread
        self.server = HTTPServer(('localhost', OAUTH_CALLBACK_PORT), self._create_callback_handler())
        self.server_thread = threading.Thread(target=self.server.serve_forever)
        self.server_thread.daemon = True
        self.server_thread.start()
    
    def __del__(self):
        """Shutdown the HTTP server on deletion."""
        # Stop the server
        self.server.shutdown()
        self.server_thread.join(timeout=1)

    def _create_callback_handler(self):
        """Create an HTTP request handler for OAuth callback."""
        handler_self = self

        class CallbackHandler(BaseHTTPRequestHandler):
            def do_GET(self):
                """Handle GET request from OAuth callback."""
                # Parse the callback URL
                parsed_url = urlparse(self.path)
                params = parse_qs(parsed_url.query)

                # Extract authorization code and state
                handler_self.auth_code = params.get("code", [None])[0]
                handler_self.state = params.get("state", [None])[0]
                handler_self.error = params.get("error", [None])[0]

                # Send response to browser
                self.send_response(200)
                self.send_header("Content-type", "text/html; charset=utf-8")
                self.end_headers()

                if handler_self.error:
                    html_response = f"""
                    <!DOCTYPE html>
                    <html>
                    <head>
                        <title>認証エラー</title>
                        <style>
                            body {{ font-family: Arial, sans-serif; margin: 50px; background-color: #f8d7da; }}
                            .container {{ max-width: 600px; margin: 0 auto; padding: 20px; background: white; border-radius: 5px; }}
                            h1 {{ color: #721c24; }}
                        </style>
                    </head>
                    <body>
                        <div class="container">
                            <h1>❌ 認証に失敗しました</h1>
                            <p>エラー: {handler_self.error}</p>
                            <p>このウィンドウを閉じて、もう一度お試しください。</p>
                        </div>
                    </body>
                    </html>
                    """
                else:
                    html_response = """
                    <!DOCTYPE html>
                    <html>
                    <head>
                        <title>認証成功</title>
                        <style>
                            body { font-family: Arial, sans-serif; margin: 50px; background-color: #d4edda; }
                            .container { max-width: 600px; margin: 0 auto; padding: 20px; background: white; border-radius: 5px; }
                            h1 { color: #155724; }
                        </style>
                    </head>
                    <body>
                        <div class="container">
                            <h1>✅ 認証に成功しました!</h1>
                            <p>ログインが完了しました。このウィンドウを閉じて、アプリケーションにお戻りください。</p>
                        </div>
                    </body>
                    </html>
                    """

                self.wfile.write(html_response.encode('utf-8'))

                # Signal that callback was received
                handler_self.callback_received.set()

        return CallbackHandler

    async def handle_redirect(self, auth_url: str) -> None:
        """Handle the OAuth redirect by opening the browser automatically."""
        pr = urlparse(auth_url)
        d = parse_qs(pr.query)
        d["audience"] = [self.audience]
        # d["scope"] = ["openid profile email"]
        auth_url = urlunparse(pr._replace(query=urlencode(d, doseq=True)))

        self.auth_url = auth_url

        logger.info("=" * 60)
        logger.info("Authorization Required")
        logger.info("=" * 60)
        logger.info("Opening browser for authorization in incognito/private mode...")
        logger.info(f"\nAuthorization URL: {auth_url}\n")

        # Open browser automatically in incognito mode
        if not open_browser_incognito(auth_url):
            logger.error("Failed to open browser")

        logger.info("Please complete the login in your browser.")
        logger.info("=" * 60)

    async def handle_callback(self) -> tuple[str, str | None]:
        """Handle the OAuth callback by running a local HTTP server."""
        # Parse callback URI from environment to get port
        callback_uri = OAUTH_CALLBACK_URI
        parsed_callback = urlparse(callback_uri)
        port = parsed_callback.port or 5000

        logger.info(f"Starting local HTTP server on port {port}...")
        logger.info("Waiting for authorization callback...")

        # Wait for callback to be received
        await asyncio.get_event_loop().run_in_executor(
            None, self.callback_received.wait
        )

        if self.error:
            logger.error(f"Authorization failed: {self.error}")
            raise Exception(f"Authorization failed: {self.error}")

        if not self.auth_code:
            logger.error("No authorization code received")
            raise Exception("No authorization code received")

        logger.info("Authorization code received successfully")
        logger.debug("auth_code: %s", self.auth_code)
        logger.debug("state: %s", self.state)
        return self.auth_code, self.state


class OAuthClientProviderCustom(OAuthClientProvider):
    def __init__(self, audience: str, **kwargs):
        super().__init__(**kwargs)
        self.audience = audience
    
    # override to include audience in token request
    async def _exchange_token_authorization_code(
        self, auth_code: str, code_verifier: str, *, token_data: dict[str, Any] | None = {}
    ) -> httpx.Request:
        """Build token exchange request for authorization_code flow."""
        if self.context.client_metadata.redirect_uris is None:
            raise OAuthFlowError("No redirect URIs provided for authorization code grant")  # pragma: no cover
        if not self.context.client_info:
            raise OAuthFlowError("Missing client info")  # pragma: no cover

        token_url = self._get_token_endpoint()
        token_data = token_data or {}
        token_data.update(
            {
                "grant_type": "authorization_code",
                "code": auth_code,
                "redirect_uri": str(self.context.client_metadata.redirect_uris[0]),
                "client_id": self.context.client_info.client_id,
                "code_verifier": code_verifier,
                "audience": self.audience,
            }
        )

        # Only include resource param if conditions are met
        if self.context.should_include_resource_param(self.context.protocol_version):
            token_data["resource"] = self.context.get_resource_url()  # RFC 8707

        if self.context.client_info.client_secret:
            token_data["client_secret"] = self.context.client_info.client_secret
        
        logger.debug(f"Token request data: {token_data}")

        return httpx.Request(
            "POST", token_url, data=token_data, headers={"Content-Type": "application/x-www-form-urlencoded"}
        )


async def main():
    """Run the Auth0 MCP client."""
    mcp_url = f"http://{MCP_HOST}:{MCP_PORT}"

    logger.info(f"Connecting to MCP server at: {mcp_url}")
    logger.info(f"Using callback URI: {OAUTH_CALLBACK_URI}")
    logger.debug(f"Using Auth0 client ID: {AUTH0_CLIENT_ID}")
    logger.debug(f"Using Auth0 client secret: {AUTH0_CLIENT_SECRET}")

    # Create storage for tokens and pre-registered client info
    storage = InMemoryTokenStorage()

    # Pre-populate with registered client information
    await storage.set_client_info(OAuthClientInformationFull(
        client_id=AUTH0_CLIENT_ID,
        client_secret=AUTH0_CLIENT_SECRET,
        redirect_uris=[AnyUrl(OAUTH_CALLBACK_URI)],
        client_metadata=OAuthClientMetadata(
            client_name="Auth0 MCP Client",
            redirect_uris=[AnyUrl(OAUTH_CALLBACK_URI)],
            grant_types=["authorization_code", "refresh_token"],
            response_types=["code"],
            scope=REQUIRED_PERMISSION,
        )
    ))

    # Create callback handler instance that can share state
    callback_handler = OAuthCallbackHandler(audience=AUTH0_AUDIENCE)

    # Create OAuth client provider with pre-registered client
    oauth_auth = OAuthClientProviderCustom(
        audience=AUTH0_AUDIENCE,
        server_url=mcp_url,
        client_metadata=OAuthClientMetadata(
            client_name="Auth0 MCP Client",
            redirect_uris=[AnyUrl(OAUTH_CALLBACK_URI)],
            grant_types=["authorization_code", "refresh_token"],
            response_types=["code"],
            scope=REQUIRED_PERMISSION,
        ),
        storage=storage,
        redirect_handler=callback_handler.handle_redirect,
        callback_handler=callback_handler.handle_callback,
    )

    tool_names = []

    try:
        async with streamablehttp_client(url=f"{mcp_url}/mcp", auth=oauth_auth) as (read, write, _):
            async with ClientSession(read, write) as session:
                logger.info("Initializing session...")
                await session.initialize()
                logger.info("Session initialized successfully")
                session

                # List available tools
                tools = await session.list_tools()
                tool_names = [tool.name for tool in tools.tools]
                logger.info(f"Available tools: {tool_names}")

                # Call a tool as an example
                if tools.tools:
                    logger.info("\nCalling 'get_time' tool...")
                    result = await session.call_tool("get_time", arguments={})
                    logger.info(f"Result: {result.content}")

    except Exception as e:
        logger.error(f"Error connecting to MCP server: {str(e)}")
        raise

    logger.debug(f"OAuth token: {await storage.get_tokens()}")
    
    model = ChatGoogleGenerativeAI(
        model="gemini-2.5-flash", 
        google_api_key=os.environ["GOOGLE_API_KEY"]
    )
    token = await storage.get_tokens()
    token = token.access_token

    tools_info = [
        {
            name: {
                "transport": "streamable_http",
                "url": f"{mcp_url}/mcp",
                "auth": oauth_auth,
                # "headers": {
                #     "Authorization": f"Bearer {token}"  # use existing token
                # },
            }
        } for name in tool_names
    ]
    tools_dict = {}
    for tool in tools_info:
        tools_dict.update(tool)
    client = MultiServerMCPClient(tools_dict)
    tools = await client.get_tools()

    agent = create_agent(model=model, tools=tools)
    user_message = input("User input: ")
    response = await agent.ainvoke({"messages": user_message})
    logger.info(f"Agent response: {response["messages"][-1].content}")


if __name__ == "__main__":
        # Normal mode - connect to MCP server
        asyncio.run(main())

MCPに関連する部分を重点的に解説していきます。

トークン保存先の定義
class InMemoryTokenStorage(TokenStorage):
    """Demo In-memory token storage implementation."""

    def __init__(self):
        self.tokens: OAuthToken | None = None
        self.client_info: OAuthClientInformationFull | None = None

    async def get_tokens(self) -> OAuthToken | None:
        """Get stored tokens."""
        return self.tokens

    async def set_tokens(self, tokens: OAuthToken) -> None:
        """Store tokens."""
        time.sleep(1)  # iat delay to ensure token consistency
        self.tokens = tokens

    async def get_client_info(self) -> OAuthClientInformationFull | None:
        """Get stored client information."""
        return self.client_info

    async def set_client_info(self, client_info: OAuthClientInformationFull) -> None:
        """Store client information."""
        self.client_info = client_info

これは、認可が成功した場合にアクセストークンなどの情報を管理するためのクラスです。
Python SDK for MCPによるOAuth認証フローが成功した場合は、ここにトークンなどが格納されていきます。
また、set_tokensメソッドにsleep処理が入っていますが、これはある時からJWTの発行された時間を示すiatパラメータの検証で失敗するようになってしまったため、時間の不整合を取り除くための暫定的な処理になります。(おそらくどこかのタイミングでサーバー側の時間とAuth0の時間が合致しなくなってしまったのだと思われるため、不要であれば削除しても問題ないです。)
一度アクセストークンが設定されたインスタンスを使いまわすと再度アクセストークンを取得せずともセキュアなMCPサーバーの利用が可能になります。

OAuthフローにおけるリダイレクト処理
class OAuthCallbackHandler:
    """OAuth callback handler that opens browser and captures the callback."""

    def __init__(self, audience: str):
        self.auth_url: str = ""
        self.auth_code: str | None = None
        self.state: str | None = None
        self.error: str | None = None
        self.callback_received = threading.Event()
        self.audience: str = audience

        # Create and start HTTP server in a separate thread
        self.server = HTTPServer(('localhost', OAUTH_CALLBACK_PORT), self._create_callback_handler())
        self.server_thread = threading.Thread(target=self.server.serve_forever)
        self.server_thread.daemon = True
        self.server_thread.start()
    
    def __del__(self):
        """Shutdown the HTTP server on deletion."""
        # Stop the server
        self.server.shutdown()
        self.server_thread.join(timeout=1)

    def _create_callback_handler(self):
        """Create an HTTP request handler for OAuth callback."""
        handler_self = self

        class CallbackHandler(BaseHTTPRequestHandler):
            def do_GET(self):
                """Handle GET request from OAuth callback."""
                # Parse the callback URL
                parsed_url = urlparse(self.path)
                params = parse_qs(parsed_url.query)

                # Extract authorization code and state
                handler_self.auth_code = params.get("code", [None])[0]
                handler_self.state = params.get("state", [None])[0]
                handler_self.error = params.get("error", [None])[0]

                # Send response to browser
                self.send_response(200)
                self.send_header("Content-type", "text/html; charset=utf-8")
                self.end_headers()

                if handler_self.error:
                    html_response = f"""
                    <!DOCTYPE html>
                    <html>
                    <head>
                        <title>認証エラー</title>
                        <style>
                            body {{ font-family: Arial, sans-serif; margin: 50px; background-color: #f8d7da; }}
                            .container {{ max-width: 600px; margin: 0 auto; padding: 20px; background: white; border-radius: 5px; }}
                            h1 {{ color: #721c24; }}
                        </style>
                    </head>
                    <body>
                        <div class="container">
                            <h1>❌ 認証に失敗しました</h1>
                            <p>エラー: {handler_self.error}</p>
                            <p>このウィンドウを閉じて、もう一度お試しください。</p>
                        </div>
                    </body>
                    </html>
                    """
                else:
                    html_response = """
                    <!DOCTYPE html>
                    <html>
                    <head>
                        <title>認証成功</title>
                        <style>
                            body { font-family: Arial, sans-serif; margin: 50px; background-color: #d4edda; }
                            .container { max-width: 600px; margin: 0 auto; padding: 20px; background: white; border-radius: 5px; }
                            h1 { color: #155724; }
                        </style>
                    </head>
                    <body>
                        <div class="container">
                            <h1>✅ 認証に成功しました!</h1>
                            <p>ログインが完了しました。このウィンドウを閉じて、アプリケーションにお戻りください。</p>
                        </div>
                    </body>
                    </html>
                    """

                self.wfile.write(html_response.encode('utf-8'))

                # Signal that callback was received
                handler_self.callback_received.set()

        return CallbackHandler

    async def handle_redirect(self, auth_url: str) -> None:
        """Handle the OAuth redirect by opening the browser automatically."""
        pr = urlparse(auth_url)
        d = parse_qs(pr.query)
        d["audience"] = [self.audience]
        # d["scope"] = ["openid profile email"]
        auth_url = urlunparse(pr._replace(query=urlencode(d, doseq=True)))

        self.auth_url = auth_url

        logger.info("=" * 60)
        logger.info("Authorization Required")
        logger.info("=" * 60)
        logger.info("Opening browser for authorization in incognito/private mode...")
        print(f"\nAuthorization URL: {auth_url}\n")

        # Open browser automatically in incognito mode
        if not open_browser_incognito(auth_url):
            logger.error("Failed to open browser")

        logger.info("Please complete the login in your browser.")
        logger.info("=" * 60)

    async def handle_callback(self) -> tuple[str, str | None]:
        """Handle the OAuth callback by running a local HTTP server."""
        # Parse callback URI from environment to get port
        callback_uri = OAUTH_CALLBACK_URI
        parsed_callback = urlparse(callback_uri)
        port = parsed_callback.port or 5000

        logger.info(f"Starting local HTTP server on port {port}...")
        logger.info("Waiting for authorization callback...")

        # Wait for callback to be received
        await asyncio.get_event_loop().run_in_executor(
            None, self.callback_received.wait
        )

        if self.error:
            logger.error(f"Authorization failed: {self.error}")
            raise Exception(f"Authorization failed: {self.error}")

        if not self.auth_code:
            logger.error("No authorization code received")
            raise Exception("No authorization code received")

        logger.info("Authorization code received successfully")
        logger.info("auth_code: %s", self.auth_code)
        logger.info("state: %s", self.state)
        return self.auth_code, self.state

上記の部分はOAuthによる認可コードの取得とアクセストークンの取得を主に担当している部分になります。

handle_redirectメソッドはauth_redirect引数で受けとった認可サーバーへのリクエストのためのURLを基に、ブラウザ経由で利用者が対象のリソースに対して権限の承認を確認するために用意するコールバック関数です。このコールバック関数を呼び出すOAuthClientProviderはAuthorization Code Grantを利用したアクセスを要求するため、受け取ったURLをブラウザでアクセスし、認可コードの取得を試みるようにしています。
handle_callbackメソッドに関しては、認可サーバーとのやり取りでアクセスが承認された場合に、呼び出し元のOAuthClientProviderに認可コードとステートを伝えるためのコールバック関数になっています。処理としては、あらかじめ起動しておいたローカルサーバーでアクセスが承認された場合のリダイレクトとパラメータを受け取っておき、そのパラメータに含まれていた認可コードとステートを呼び出し元に返すというものになっています。

また、上記の処理だけだと認可コードしか発行されておらず、実際のMCPサーバーへのアクセスに利用されるアクセストークンが発行されていませんが、利用元のOAuthClientProviderクラスが自動でアクセストークンへの交換を実施し、InMemoryTokenStorageインスタンスに格納してくれます。

audience属性設定のためのカスタムOAuthClientProvider定義
class OAuthClientProviderCustom(OAuthClientProvider):
    def __init__(self, audience: str, **kwargs):
        super().__init__(**kwargs)
        self.audience = audience
    
    # override to include audience in token request
    async def _exchange_token_authorization_code(
        self, auth_code: str, code_verifier: str, *, token_data: dict[str, Any] | None = {}
    ) -> httpx.Request:
        """Build token exchange request for authorization_code flow."""
        if self.context.client_metadata.redirect_uris is None:
            raise OAuthFlowError("No redirect URIs provided for authorization code grant")  # pragma: no cover
        if not self.context.client_info:
            raise OAuthFlowError("Missing client info")  # pragma: no cover

        token_url = self._get_token_endpoint()
        token_data = token_data or {}
        token_data.update(
            {
                "grant_type": "authorization_code",
                "code": auth_code,
                "redirect_uri": str(self.context.client_metadata.redirect_uris[0]),
                "client_id": self.context.client_info.client_id,
                "code_verifier": code_verifier,
                "audience": self.audience,
            }
        )

        # Only include resource param if conditions are met
        if self.context.should_include_resource_param(self.context.protocol_version):
            token_data["resource"] = self.context.get_resource_url()  # RFC 8707

        if self.context.client_info.client_secret:
            token_data["client_secret"] = self.context.client_info.client_secret
        
        logger.debug(f"Token request data: {token_data}")

        return httpx.Request(
            "POST", token_url, data=token_data, headers={"Content-Type": "application/x-www-form-urlencoded"}
        )

上記の部分は、OAuthClientProviderクラスを継承した独自クラスのOAuthClientProviderCustom定義をしている部分になります。
この部分はAuth0の仕様として、認可コードを利用したアクセストークンの発行にaudienceを指定しないと、想定していたJWTではなく、共通鍵不明のJWEが発行されてしまうという事象へ対処するために実装しています。
(詳細はこのサイトを参考)
実装している内容としては、コンストラクタ処理で追加した引数audienceを受け取り、それを内部向けの_exchange_token_authorization_codeメソッドをオーバーライドする形で利用することで、アクセストークンの取得までを自動で実施するというものになっています。

各種OAuth設定値の設定
# Create storage for tokens and pre-registered client info
storage = InMemoryTokenStorage()

# Pre-populate with registered client information
await storage.set_client_info(OAuthClientInformationFull(
    client_id=AUTH0_CLIENT_ID,
    client_secret=AUTH0_CLIENT_SECRET,
    redirect_uris=[AnyUrl(OAUTH_CALLBACK_URI)],
    client_metadata=OAuthClientMetadata(
        client_name="Auth0 MCP Client",
        redirect_uris=[AnyUrl(OAUTH_CALLBACK_URI)],
        grant_types=["authorization_code", "refresh_token"],
        response_types=["code"],
        scope=REQUIRED_PERMISSION,
    )
))

# Create callback handler instance that can share state
callback_handler = OAuthCallbackHandler(audience=AUTH0_AUDIENCE)

# Create OAuth client provider with pre-registered client
oauth_auth = OAuthClientProviderCustom(
    audience=AUTH0_AUDIENCE,
    server_url=mcp_url,
    client_metadata=OAuthClientMetadata(
        client_name="Auth0 MCP Client",
        redirect_uris=[AnyUrl(OAUTH_CALLBACK_URI)],
        grant_types=["authorization_code", "refresh_token"],
        response_types=["code"],
        scope=REQUIRED_PERMISSION,
    ),
    storage=storage,
    redirect_handler=callback_handler.handle_redirect,
    callback_handler=callback_handler.handle_callback,
)

上記はOAuthを利用した認可フローを実施するために必要な情報を登録する部分になっています。
ここで適切に設定したOAuthClientProviderCustomインスタンスをstreamablehttp_clientメソッドやMultiServerMCPClientのコンストラクタに利用することでOAuthを利用したセキュアなMCPサーバーの利用を実現することができます。

MCPサーバー

ソースコード全文
mcp_server.py
from pydantic import AnyHttpUrl

from mcp.server.auth.provider import AccessToken, TokenVerifier
from mcp.server.auth.settings import AuthSettings
from mcp.server.fastmcp import FastMCP
import datetime
from dotenv import load_dotenv
import os
import jwt
from jwt import PyJWKClient
import logging

# Setup logging
logging.basicConfig(
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
# logger.setLevel(logging.DEBUG)

# Load environment variables
load_dotenv()

AUTH0_DOMAIN=os.environ["AUTH0_DOMAIN"]
AUTH0_CLIENT_ID=os.environ["AUTH0_CLIENT_ID"]
AUTH0_CLIENT_SECRET=os.environ["AUTH0_CLIENT_SECRET"]
AUTH0_AUDIENCE = os.environ["AUTH0_AUDIENCE"]

MCP_HOST=os.environ["MCP_HOST"]
MCP_PORT=int(os.environ["MCP_PORT"])

REQUIRED_PERMISSION=os.environ["REQUIRED_PERMISSION"]
OAUTH_CALLBACK_URI=os.environ["OAUTH_CALLBACK_URI"]
OAUTH_CALLBACK_PORT=int(os.environ["OAUTH_CALLBACK_PORT"])

JWT_ALGORITHM = os.environ["JWT_ALGORITHM"]


class Auth0TokenVerifier(TokenVerifier):
    """Auth0 token verifier with flexible authorization modes."""

    async def verify_token(self, token: str) -> AccessToken | None:
        """Verify Auth0 JWT token and check user authorization."""
        try:
            # Log token for debugging (first 50 chars only for security)
            logger.debug(f"Received token: {token}")

            # Get JWKS from Auth0
            jwks_url = f"https://{AUTH0_DOMAIN}/.well-known/jwks.json"
            jwks_client = PyJWKClient(jwks_url)
            signing_key = jwks_client.get_signing_key_from_jwt(token).key


            # Decode and verify the token
            payload = jwt.decode(
                token,
                signing_key,
                algorithms=[JWT_ALGORITHM],
                audience=AUTH0_AUDIENCE,
                issuer=f"https://{AUTH0_DOMAIN}/"
            )

            logger.debug(f"Decoded payload: {payload}")

            # Extract user information
            user_id = payload.get("sub")
            scopes = payload.get("scope", "").split()
            expires_at = payload.get("exp", 0)
            # permissions = payload.get("permissions", [])
            # roles = payload.get("https://mcp-server/roles", [])  # Custom claim for roles

            logger.debug(f"Token verified for user: {user_id}")
            # logger.debug(f"User permissions: {permissions}")
            # logger.debug(f"User roles: {roles}")
            logger.debug(f"User scopes: {scopes}")
            logger.debug(f"User ID: {user_id}")
            logger.debug(f"Token expires at: {expires_at}")

            # Return AccessToken with user information
            return AccessToken(
                token=token,
                client_id=user_id,
                expires_at=expires_at,
                scopes=scopes,
            )

        except jwt.ExpiredSignatureError:
            logger.error("Token has expired")
            return None
        except jwt.InvalidTokenError as e:
            logger.error(f"Invalid token: {str(e)}")
            return None
        except Exception as e:
            logger.error(f"Token verification failed: {str(e)}")
            return None


# Create FastMCP instance as a Resource Server
mcp = FastMCP(
    "Auth0 Protected MCP Server",
    stateless_http=True,
    json_response=True,
    # Token verifier for authentication
    token_verifier=Auth0TokenVerifier(),
    # Auth settings for RFC 9728 Protected Resource Metadata
    auth=AuthSettings(
        issuer_url=AnyHttpUrl(f"https://{AUTH0_DOMAIN}"),  # Auth0 Authorization Server URL
        resource_server_url=AnyHttpUrl(f"http://{MCP_HOST}:{MCP_PORT}"),
        required_scopes=[REQUIRED_PERMISSION],
    ),
    host=MCP_HOST,
    port=MCP_PORT
)


@mcp.tool()
def get_time() -> str:
    """Get the current time. (Format: YYYY-MM-DD HH:MM:SS)"""
    now = datetime.datetime.now()
    return now.strftime("%Y-%m-%d %H:%M:%S")


if __name__ == "__main__":
    logger.info(f"Starting Auth0 Protected MCP Server on {MCP_HOST}:{MCP_PORT}")
    logger.info(f"Auth0 Domain: {AUTH0_DOMAIN}")
    logger.info(f"Auth0 Audience: {AUTH0_AUDIENCE}")
    mcp.run(transport="streamable-http")

サーバー側におけるMCPに関連する部分を重点的に解説していきます。

アクセストークン検証
class Auth0TokenVerifier(TokenVerifier):
    """Auth0 token verifier with flexible authorization modes."""

    async def verify_token(self, token: str) -> AccessToken | None:
        """Verify Auth0 JWT token and check user authorization."""
        try:
            # Log token for debugging (first 50 chars only for security)
            logger.debug(f"Received token: {token}")

            # Get JWKS from Auth0
            jwks_url = f"https://{AUTH0_DOMAIN}/.well-known/jwks.json"
            jwks_client = PyJWKClient(jwks_url)
            signing_key = jwks_client.get_signing_key_from_jwt(token).key


            # Decode and verify the token
            payload = jwt.decode(
                token,
                signing_key,
                algorithms=[JWT_ALGORITHM],
                audience=AUTH0_AUDIENCE,
                issuer=f"https://{AUTH0_DOMAIN}/"
            )

            logger.debug(f"Decoded payload: {payload}")

            # Extract user information
            user_id = payload.get("sub")
            scopes = payload.get("scope", "").split()
            expires_at = payload.get("exp", 0)
            # permissions = payload.get("permissions", [])
            # roles = payload.get("https://mcp-server/roles", [])  # Custom claim for roles

            logger.debug(f"Token verified for user: {user_id}")
            # logger.debug(f"User permissions: {permissions}")
            # logger.debug(f"User roles: {roles}")
            logger.debug(f"User scopes: {scopes}")
            logger.debug(f"User ID: {user_id}")
            logger.debug(f"Token expires at: {expires_at}")

            # Return AccessToken with user information
            return AccessToken(
                token=token,
                client_id=user_id,
                expires_at=expires_at,
                scopes=scopes,
            )

        except jwt.ExpiredSignatureError:
            logger.error("Token has expired")
            return None
        except jwt.InvalidTokenError as e:
            logger.error(f"Invalid token: {str(e)}")
            return None
        except Exception as e:
            logger.error(f"Token verification failed: {str(e)}")
            return None

上記の部分はMCPクライアントから送られてきたアクセストークンの検証を実施する部分です。
今回のAuth0の設定ではアクセストークンがJWT形式であるため、JWTの検証を実施しています。
Auth0ではhttps://{AUTH0_DOMAIN}/.well-known/jwks.jsonというURLにJWTの検証に必要な共通鍵などの情報が格納されているため、ここの値を参照し、検証を進めていきます。

実際に格納されている値の例
https://{AUTH0_DOMAIN}/.well-known/jwks.jsonに格納されている情報例
{
  "keys":[
    {
      "kty":"RSA",
      "use":"sig",
      "n":"xxxxxxxxx",
      "e":"AQAB",
      "kid":"xxxxxx",
      "x5t":"xxxxx",
      "x5c":["xxxxxxxx"],
      "alg":"RS256"
    },{
      "kty":"RSA",
      "use":"sig",
      "n":"xxxxxxxxx",
      "e":"AQAB",
      "kid":"xxxxxx",
      "x5t":"xxxxx",
      "x5c":["xxxxxxxx"],
      "alg":"RS256"
    }
  ]
}

verify_tokenメソッド内でJWT自体の検証を実施し、呼び出し元に以下のように定義されているAccessTokenクラスのインスタンスを返却することで、適切なアクセス権限が付与されていないMCPクライアントや有効期限が切れているアクセストークンを利用していた場合に403 Forbiddenの応答を返すようにすることができます。
ちなみにですが、筆者環境だとAccessToken.resourceパラメータは検証に利用されていなかったため、検証したい場合は独自カスタムクラスをラップして実装したりする必要があると思います。

AccessTokenクラス
class AccessToken(BaseModel):
    token: str
    client_id: str
    scopes: list[str]
    expires_at: int | None = None
    resource: str | None = None  # RFC 8707 resource indicator

以下には実際に動作させるMCPサーバーのインスタンス化部分を記載しています。
今回実装するMCPサーバーは単純にサーバー時間を返すだけツール機能を用意したものをtransport="streamable-http"として実装しています。
またOAuthによる認可を導入するために、FastMCPクラスのコンストラクタにおけるauth引数にAuth0で設定した値などを設定しています。

MCPサーバーのインスタンス化
# Create FastMCP instance as a Resource Server
mcp = FastMCP(
    "Auth0 Protected MCP Server",
    stateless_http=True,
    json_response=True,
    # Token verifier for authentication
    token_verifier=Auth0TokenVerifier(),
    # Auth settings for RFC 9728 Protected Resource Metadata
    auth=AuthSettings(
        issuer_url=AnyHttpUrl(f"https://{AUTH0_DOMAIN}"),  # Auth0 Authorization Server URL
        resource_server_url=AnyHttpUrl(f"http://{MCP_HOST}:{MCP_PORT}"),
        required_scopes=[REQUIRED_PERMISSION],
    ),
    host=MCP_HOST,
    port=MCP_PORT
)


@mcp.tool()
def get_time() -> str:
    """Get the current time. (Format: YYYY-MM-DD HH:MM:SS)"""
    now = datetime.datetime.now()
    return now.strftime("%Y-%m-%d %H:%M:%S")

動作結果

動作させる場合はuv run mcp_server.pyuv run mcp_client.pyの順でMCPサーバー → MCPクライアントという順でPythonプログラムを起動していきます。
実際に「mcp_client.py」を動作させるとAuth0における動作させているアプリケーションに対してのアクセスを承認するための画面がブラウザで表示されるので、あらかじめアクセス権限を付与していおいたユーザーでログインし、承認します。

auth0_user_login.png

このプログラムでは、あらかじめアクセス権限を付与したユーザーでの承認が完了するとユーザーからの入力を受け付けるようになっているため、試しに「What time is it now ?」と入力してみます。すると、MCPクライアントを実行しているターミナルで以下のようなログが出力されていることを確認でき、AIアプリケーションが現在の日時を把握できていることが確認できます。

User input: What time is it now ?
2025-12-09 18:33:43,218 - __main__ - INFO - Agent response: The current time is 2025-12-09 18:33:41.

またMCPサーバー側のログを一部抜粋してみてみると、以下のようになっており、最初にMCPクライアントがアクセストークン無しでアクセスを試み、失敗していることが確認できます。
その後のやり取りに関しては、特にアクセスを拒否することなくMCPサーバーが動作していることの確認ができます。

アクセス権限を付与したユーザーによる承認を実施した際のログ
INFO:     Started server process [30916]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://localhost:5555 (Press CTRL+C to quit)
INFO:     ::1:49683 - "POST /mcp HTTP/1.1" 401 Unauthorized
INFO:     ::1:49683 - "GET /.well-known/oauth-protected-resource HTTP/1.1" 200 OK
INFO:     ::1:57136 - "POST /mcp HTTP/1.1" 200 OK
INFO:     ::1:57136 - "POST /mcp HTTP/1.1" 202 Accepted
INFO:     ::1:51460 - "POST /mcp HTTP/1.1" 200 OK
INFO:     ::1:51460 - "POST /mcp HTTP/1.1" 200 OK
INFO:     ::1:51463 - "POST /mcp HTTP/1.1" 200 OK
INFO:     ::1:51463 - "POST /mcp HTTP/1.1" 202 Accepted
INFO:     ::1:51469 - "POST /mcp HTTP/1.1" 200 OK
INFO:     ::1:51472 - "POST /mcp HTTP/1.1" 200 OK
INFO:     ::1:51472 - "POST /mcp HTTP/1.1" 202 Accepted
INFO:     ::1:51475 - "POST /mcp HTTP/1.1" 200 OK
INFO:     ::1:51475 - "POST /mcp HTTP/1.1" 200 OK

次にあらかじめアクセス権限を付与していないユーザーによる承認を実施した場合のMCPサーバーのログは以下のようになっており、正常にアクセス権限が付与されていないことが確認できます。

アクセス権限を付与していないユーザーによる承認を実施した際のログ
INFO:     Started server process [9912]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://localhost:5555 (Press CTRL+C to quit)
INFO:     ::1:62816 - "POST /mcp HTTP/1.1" 401 Unauthorized
INFO:     ::1:62816 - "GET /.well-known/oauth-protected-resource HTTP/1.1" 200 OK
INFO:     ::1:54066 - "POST /mcp HTTP/1.1" 403 Forbidden

感想

今回はMCPに認可機能が実装されているということを記事執筆の半年くらい前に知っていたのですが、あまり情報が増えない & 実際にMCPサーバーを外部公開する際に必須な技術だと思ったのでこの記事を書いてみました。
MCPサーバーに市中の認証/認可サービスを用いたOAuthによるアクセス制限を実装する方法がなかなか見つからなかったため、公式が提供しているサンプルプログラムをメインの情報源として実装したため、なかなか大変でしたが、いい経験になったと思います。
(ちなみに一番苦労したのはAuth0の設定だったりします...)
MCP自体はまだまだ発展途上の規格だったりするので、今後のアップデートで今回作成したプログラムが動作しなくなったりする可能性はありますが、最新の規格と照らし合わせて参考にしてもらえると助かります。

また明日以降もNTTテクノクロスアドベントカレンダー2025の記事が投稿されるため引き続きお楽しみいただけると助かります。

参考にしたサイト

10
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
10
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?