この記事は、NTTテクノクロス Advent Calendar 2025 シリーズ2の18日目の記事になります。
皆さんこんにちは。NTTテクノクロスの西川です。
以前まではXR系の部署に所属していたのですが、今年度からセキュリティ系の部署に異動したため、心機一転してセキュリティに関連する記事を書いてみたいと思います。
セキュリティといってもセキュリティメインの話ではなく、最近LLM関連のキーワードとしてよく聞くようになっているModel Context Protocol (MCP) をメインにセキュアに運用するため必要となる認可機能を実装してみたという話をしたいと思います。
また本記事はOAuthを利用した認可処理を実装するため、「実装」を読む前にOAuthに関する基本的な知識があると、内容をより理解することができると思います。
MCP (Model Context Protocol)
MCPとは?
最近のLLMを利用することができるシステムとかだとMCPで連携することができますみたいな話をよく聞くようになっていると思いますが、そもそもMCPとは何でしょうか?
MCP自体はClaudeの開発で知られれているAnthropicが提唱しているLLMベースのAIアプリケーションが外部のリソースを簡単に利用するための通信規格みたいなものです。
(通信規格と言っていますが、MCP単体でRFCとかに登録されているわけではなく、まだまだ発展途上のものです。)
MCP自体の説明はMCPの公式ドキュメントでは、
Think of MCP like a USB-C port for AI applications. Just as USB-C provides a standardized way to connect electronic devices, MCP provides a standardized way to connect AI applications to external systems.
(機械翻訳)
MCPはAIアプリケーション用のUSB-Cポートのようなものだと考えてください。USB-Cが電子機器を接続するための標準化された方法を提供するのと同様に、MCPはAIアプリケーションを外部システムに接続するための標準化された方法を提供します。
と書かれており、これからAIアプリケーションに提供する外部機能を開発する際にはMCPに従って機能提供をすると簡単にAIアプリケーションへの容易な組み込みが可能になっていくものと思われます。
(記事執筆時点でも色々なAIアプリケーションが対応してきているので、デファクトスタンダードの立ち位置を獲得しつつあると思います。)
例えば、Visual Studio CodeにもMCPを用いてAIエージェントが外部のリソースにアクセスする手段が提供されており、

こんな感じで設定項目にも試験段階などではあるものの利用できる機能が増えつつあります。
MCPを利用するときに知っておきたい機能
ここでは、MCPを利用するにあたって知っておきたい情報をいくつか説明します。
まず、MCPはAIアプリケーション内で動作するMCPクライアントと外部リソースを提供するMCPサーバー間でプロトコルに従った情報のやり取りをすることで、AIアプリケーションとサーバー間で簡単にやり取りができるようになるというものです。

(MCP公式ドキュメントから画像を引用)
そのため、AIアプリケーション側の開発者はクライアント側の仕様、AIアプリケーションに外部リソースを提供する開発者はサーバー側の仕様に従った実装を行うことで、市中に存在するMCP対応のアプリケーションとして利用/組み込みを行うことができるようになります。
また、MCPクライアント/サーバーが提供する機能としては、
- MCPクライアント
- Elicitation: サーバーからユーザーへの情報を要求
- Roots:サーバー内での情報取得範囲の制限を付与
- Sampling: サーバー側がユーザー側のLLM利用を要求
- MCPサーバー
- Tools: LLMにおけるツールの利用
- Resources: LLMが利用できるデータの公開
- Prompts: LLMが活用できるプロンプトのテンプレート公開
というようなものが用意されています。(2025-11-25時点の仕様)
公式ドキュメントから確認できるAIアプリケーションにおける各機能への対応表を見てみると、この記事を執筆している時点(2025/12)では、MCPクライアントの機能に対応していないものが多く、MCPサーバー側の機能、特にTools機能はすべてのAIアプリケーションが対応していることが確認できます。
MCPが用いるデータの通信方式(Transports)としては、
- Stdio transport(ローカルで立ち上げたプロセスとの標準入出力を利用した方式)
- Streamable HTTP transport(HTTP通信を利用しリモートでのやりとりも可能な方式)
がデフォルトでサポートされています。
また実装しようと思えばCustom Transportsという形でWeb Socketなどで実装することも可能です。
MCPにおける認証/認可機能の対応
以前までのMCPサーバーを用いた外部リソースへのアクセスなどは基本的に
- ローカルに記述したCredentialsなど
- Stdio transportsで起動したローカルのプロセス
を利用することで外部のリソースにアクセスしていました。
ですが、実際にMCPサーバーを開発して幅広く利用してもらうためには、
- Streamable HTTP transportsで立ち上げておいたMCPサーバー
- OAuth/OIDCなどを用いた認可/認証による適切なアクセス権限の付与
によって実装されることが望ましいです。
一応、記事執筆時点における最新の仕様にはAuthorization(認可)に関する仕様が定まっており、OAuth 2.0/2.1を利用した適切なアクセス権限による利用が可能となっています。
(最近だと、2025/06/12GithubはOAuth 2.0 を利用したリモートのMCPサーバーアクセスを実現していたりします。)
そこで本記事では、実際にPython SDK for MCPとAuth0を利用して適切なアクセス権限が付与されたユーザーのみに利用可能なMCPサーバーの実装をしていきたいと思います。
実装
今回の実装においては、ある程度無料で利用可能な認可サービスとして「Auth0」を、MCPクライアント/サーバーの実装にはPythonに慣れていることもあって、Python SDK for MCPを利用して実装してみたいと思います。
Auth0側の設定
まず初めに認可アプリケーションであるAuth0の設定を実施します。
そもそもAuth0とは、アプリケーションやWebサービス向けの認証/認可機能を簡単に導入/提供するためのクラウドサービスであり、無料からでも利用が可能です。
アプリケーションの作成
Auth0にログインしたら、まずはアプリケーションを作成します。
アプリケーション名は好きな名前にしてもらい、アプリケーションの種類を「一般的なWebアプリケーション」に設定し作成します。

次に作成したアプリケーションの「設定 → 基本情報」から、ドメインとクライアントID、クライアントシークレットを確認し、記録しておきます。ここで記録した値はMCPクライアントの実装で利用します。

そのまま下にスクロールし、「アプリケーションのURI」から
- 許可するCallback URL: http://localhost:5000/callback
- 許可するログアウトURL: http://localhost:5000
- 許可するWebオリジン: http://localhost:5000
というように設定します。
今回はPOCのような検証環境を想定しているため、暗号化しないHTTPプロトコルによるコールバックURLを設定しましたが、本番環境では適宜HTTPSプロトコルによるセキュアな通信などで読み替えて実装してください。
APIの作成
次に左側の選択肢から「アプリケーション → API」と選択しAPIを作成します。
この時に設定する識別子(identifier)は後から変更できない上に、MCPクライアントの実装で利用するため注意してください。

APIの設定における「RBACの設定」から「RBACを有効化」と「アクセストークンに権限を追加する」という項目をONにします。この操作を実施することで特定のユーザーのみにアクセス権限を付与することができるようになります。

また、「アクセストークンの設定」が以下のようになっていることを確認し、JWTの署名アルゴリズムをプログラムのために記録しておきます。

次の設定は「パーミッション」から「パーミッションを追加する」という項目を探し、以下のように設定します。
ここで設定したパーミッションはMCPサーバーに対するアクセス権限の確認で利用するため注意してください。
ユーザー管理の設定
次に実際にアクセス権限を付与するユーザーを設定します。
まずは「ユーザー管理 → ロール」に移動、「+ ロールを作成」を押下し、ロールを作成します。
作成したロールには「パーミッション」の「パーミッションを追加」を押下し、以下のようにして先ほど作成したAPIと紐づける形でアクセス権限を付与します。

その次は「ユーザー管理 → ユーザー」から「+ ユーザーを作成」を押下し、メールアドレスとパスワードに適当な値を設定しユーザーを作成します。
作成したユーザーには「ロール」か「ロールの割り当て」を選択し、先ほど作成したロールを割り当てます。

ここまでの操作でAuth0側の設定は完了です。
AIアプリケーションの実装
ここからはPythonを利用したMCPクライアント/サーバーの実装を行います。
ソースコードはGithubに公開しているので気になった方は参照してみてください。
また実装したソースコードは、Python SDK for MCPで紹介されているサンプルプログラムを基に一部実装をいたしました。
筆者の実装環境は以下のようになっています。
注意点としては、mcpライブラリはまだまだ発展途上のためバージョンを筆者と合わせるかなるべく新しいバージョンを利用してください。
筆者は少し古いバージョンを利用したときにライブラリ側のバグでOAuthにおける認可トークンの受け取り先URLが想定していないものになったりしました。
- パッケージ管理: uv
- Python バージョン: 3.13
- 利用パッケージとバージョン:
langchain==1.1.2langchain-google-genai==4.0.0langchain-mcp-adapters==0.1.14mcp[cli]==1.23.1pyjwt==2.10.1selenium==4.38.0
.envの設定
まずはAuth0の設定で記録した値を基に.envファイルの設定を実施します。
AUTH0_AUDIENCEにはAPIで設定した識別子を設定します。
# Auth0 Configuration
# Get these values from your Auth0 dashboard: https://manage.auth0.com/
AUTH0_DOMAIN=dev-xxxx.xx.auth0.com
AUTH0_CLIENT_ID=xxxxxxx
AUTH0_CLIENT_SECRET=xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
AUTH0_AUDIENCE=xxxxxxxx
JWT_ALGORITHM=RS256
# MCP Server Configuration
MCP_HOST=localhost
MCP_PORT=5555
# These are defined in your Auth0 API settings
REQUIRED_PERMISSION=access:mcp
# MCP Client Configuration (for oauth_client.py)
# Callback URI must match the one configured in Auth0
OAUTH_CALLBACK_URI=http://localhost:5000/callback
OAUTH_CALLBACK_PORT=5000
GEMINI_API_KEY=xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
今回はMCPサーバーのツール呼び出しにGeminiを利用したエージェントによる自動呼出しを実装したいと思っているため、GEMINI_API_KEYにGoogle AI Studioで取得したAPIキーをセットします。
MCPクライアント
次にMCPクライアントを実装します。
ソースコード全文
import asyncio
from urllib.parse import parse_qs, urlparse, urlunparse, urlencode
import os
import time
from dotenv import load_dotenv
import logging
import webbrowser
from http.server import HTTPServer, BaseHTTPRequestHandler
import threading
import httpx
from mcp.client.auth import OAuthFlowError
from selenium import webdriver
from selenium.webdriver.chrome.options import Options as ChromeOptions
from selenium.webdriver.firefox.options import Options as FirefoxOptions
from selenium.webdriver.edge.options import Options as EdgeOptions
from selenium.common.exceptions import WebDriverException
from typing import Any
from pydantic import AnyUrl
from mcp import ClientSession
from mcp.client.auth import OAuthClientProvider, TokenStorage
from mcp.client.streamable_http import streamablehttp_client
from mcp.shared.auth import OAuthClientInformationFull, OAuthClientMetadata, OAuthToken
from langchain_mcp_adapters.client import MultiServerMCPClient
from langchain.agents import create_agent
from langchain_google_genai import ChatGoogleGenerativeAI
# Setup logging
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
# logger.setLevel(logging.DEBUG)
# Load environment variables
load_dotenv()
AUTH0_DOMAIN=os.environ["AUTH0_DOMAIN"]
AUTH0_CLIENT_ID=os.environ["AUTH0_CLIENT_ID"]
AUTH0_CLIENT_SECRET=os.environ["AUTH0_CLIENT_SECRET"]
AUTH0_AUDIENCE = os.environ["AUTH0_AUDIENCE"]
MCP_HOST=os.environ["MCP_HOST"]
MCP_PORT=int(os.environ["MCP_PORT"])
REQUIRED_PERMISSION=os.environ["REQUIRED_PERMISSION"]
OAUTH_CALLBACK_URI=os.environ["OAUTH_CALLBACK_URI"]
OAUTH_CALLBACK_PORT=int(os.environ["OAUTH_CALLBACK_PORT"])
def open_browser_incognito(url: str) -> bool:
"""
Open URL in incognito/private browsing mode using Selenium.
Tries multiple browsers in order: Chrome, Firefox, Edge.
Falls back to normal mode if incognito mode fails.
Returns:
bool: True if successfully opened, False otherwise
"""
# Try Chrome with incognito mode
try:
chrome_options = ChromeOptions()
chrome_options.add_argument("--incognito")
driver = webdriver.Chrome(options=chrome_options)
driver.get(url)
logger.info("Opened URL in Chrome incognito mode")
return True
except WebDriverException as e:
logger.debug(f"Failed to open Chrome: {e}")
except Exception as e:
logger.debug(f"Failed to open Chrome: {e}")
# Try Firefox with private browsing mode
try:
firefox_options = FirefoxOptions()
firefox_options.add_argument("-private")
driver = webdriver.Firefox(options=firefox_options)
driver.get(url)
logger.info("Opened URL in Firefox private mode")
return True
except WebDriverException as e:
logger.debug(f"Failed to open Firefox: {e}")
except Exception as e:
logger.debug(f"Failed to open Firefox: {e}")
# Try Edge with InPrivate mode
try:
edge_options = EdgeOptions()
edge_options.add_argument("--inprivate")
driver = webdriver.Edge(options=edge_options)
driver.get(url)
logger.info("Opened URL in Edge InPrivate mode")
return True
except WebDriverException as e:
logger.debug(f"Failed to open Edge: {e}")
except Exception as e:
logger.debug(f"Failed to open Edge: {e}")
# Fall back to default browser (normal mode)
logger.warning("Could not open in incognito mode, falling back to normal browser")
try:
webbrowser.open(url)
return True
except Exception as e:
logger.error(f"Failed to open browser: {e}")
return False
class InMemoryTokenStorage(TokenStorage):
"""Demo In-memory token storage implementation."""
def __init__(self):
self.tokens: OAuthToken | None = None
self.client_info: OAuthClientInformationFull | None = None
async def get_tokens(self) -> OAuthToken | None:
"""Get stored tokens."""
return self.tokens
async def set_tokens(self, tokens: OAuthToken) -> None:
"""Store tokens."""
time.sleep(1) # iat delay to ensure token consistency
self.tokens = tokens
async def get_client_info(self) -> OAuthClientInformationFull | None:
"""Get stored client information."""
return self.client_info
async def set_client_info(self, client_info: OAuthClientInformationFull) -> None:
"""Store client information."""
self.client_info = client_info
class OAuthCallbackHandler:
"""OAuth callback handler that opens browser and captures the callback."""
def __init__(self, audience: str):
self.auth_url: str = ""
self.auth_code: str | None = None
self.state: str | None = None
self.error: str | None = None
self.callback_received = threading.Event()
self.audience: str = audience
# Create and start HTTP server in a separate thread
self.server = HTTPServer(('localhost', OAUTH_CALLBACK_PORT), self._create_callback_handler())
self.server_thread = threading.Thread(target=self.server.serve_forever)
self.server_thread.daemon = True
self.server_thread.start()
def __del__(self):
"""Shutdown the HTTP server on deletion."""
# Stop the server
self.server.shutdown()
self.server_thread.join(timeout=1)
def _create_callback_handler(self):
"""Create an HTTP request handler for OAuth callback."""
handler_self = self
class CallbackHandler(BaseHTTPRequestHandler):
def do_GET(self):
"""Handle GET request from OAuth callback."""
# Parse the callback URL
parsed_url = urlparse(self.path)
params = parse_qs(parsed_url.query)
# Extract authorization code and state
handler_self.auth_code = params.get("code", [None])[0]
handler_self.state = params.get("state", [None])[0]
handler_self.error = params.get("error", [None])[0]
# Send response to browser
self.send_response(200)
self.send_header("Content-type", "text/html; charset=utf-8")
self.end_headers()
if handler_self.error:
html_response = f"""
<!DOCTYPE html>
<html>
<head>
<title>認証エラー</title>
<style>
body {{ font-family: Arial, sans-serif; margin: 50px; background-color: #f8d7da; }}
.container {{ max-width: 600px; margin: 0 auto; padding: 20px; background: white; border-radius: 5px; }}
h1 {{ color: #721c24; }}
</style>
</head>
<body>
<div class="container">
<h1>❌ 認証に失敗しました</h1>
<p>エラー: {handler_self.error}</p>
<p>このウィンドウを閉じて、もう一度お試しください。</p>
</div>
</body>
</html>
"""
else:
html_response = """
<!DOCTYPE html>
<html>
<head>
<title>認証成功</title>
<style>
body { font-family: Arial, sans-serif; margin: 50px; background-color: #d4edda; }
.container { max-width: 600px; margin: 0 auto; padding: 20px; background: white; border-radius: 5px; }
h1 { color: #155724; }
</style>
</head>
<body>
<div class="container">
<h1>✅ 認証に成功しました!</h1>
<p>ログインが完了しました。このウィンドウを閉じて、アプリケーションにお戻りください。</p>
</div>
</body>
</html>
"""
self.wfile.write(html_response.encode('utf-8'))
# Signal that callback was received
handler_self.callback_received.set()
return CallbackHandler
async def handle_redirect(self, auth_url: str) -> None:
"""Handle the OAuth redirect by opening the browser automatically."""
pr = urlparse(auth_url)
d = parse_qs(pr.query)
d["audience"] = [self.audience]
# d["scope"] = ["openid profile email"]
auth_url = urlunparse(pr._replace(query=urlencode(d, doseq=True)))
self.auth_url = auth_url
logger.info("=" * 60)
logger.info("Authorization Required")
logger.info("=" * 60)
logger.info("Opening browser for authorization in incognito/private mode...")
logger.info(f"\nAuthorization URL: {auth_url}\n")
# Open browser automatically in incognito mode
if not open_browser_incognito(auth_url):
logger.error("Failed to open browser")
logger.info("Please complete the login in your browser.")
logger.info("=" * 60)
async def handle_callback(self) -> tuple[str, str | None]:
"""Handle the OAuth callback by running a local HTTP server."""
# Parse callback URI from environment to get port
callback_uri = OAUTH_CALLBACK_URI
parsed_callback = urlparse(callback_uri)
port = parsed_callback.port or 5000
logger.info(f"Starting local HTTP server on port {port}...")
logger.info("Waiting for authorization callback...")
# Wait for callback to be received
await asyncio.get_event_loop().run_in_executor(
None, self.callback_received.wait
)
if self.error:
logger.error(f"Authorization failed: {self.error}")
raise Exception(f"Authorization failed: {self.error}")
if not self.auth_code:
logger.error("No authorization code received")
raise Exception("No authorization code received")
logger.info("Authorization code received successfully")
logger.debug("auth_code: %s", self.auth_code)
logger.debug("state: %s", self.state)
return self.auth_code, self.state
class OAuthClientProviderCustom(OAuthClientProvider):
def __init__(self, audience: str, **kwargs):
super().__init__(**kwargs)
self.audience = audience
# override to include audience in token request
async def _exchange_token_authorization_code(
self, auth_code: str, code_verifier: str, *, token_data: dict[str, Any] | None = {}
) -> httpx.Request:
"""Build token exchange request for authorization_code flow."""
if self.context.client_metadata.redirect_uris is None:
raise OAuthFlowError("No redirect URIs provided for authorization code grant") # pragma: no cover
if not self.context.client_info:
raise OAuthFlowError("Missing client info") # pragma: no cover
token_url = self._get_token_endpoint()
token_data = token_data or {}
token_data.update(
{
"grant_type": "authorization_code",
"code": auth_code,
"redirect_uri": str(self.context.client_metadata.redirect_uris[0]),
"client_id": self.context.client_info.client_id,
"code_verifier": code_verifier,
"audience": self.audience,
}
)
# Only include resource param if conditions are met
if self.context.should_include_resource_param(self.context.protocol_version):
token_data["resource"] = self.context.get_resource_url() # RFC 8707
if self.context.client_info.client_secret:
token_data["client_secret"] = self.context.client_info.client_secret
logger.debug(f"Token request data: {token_data}")
return httpx.Request(
"POST", token_url, data=token_data, headers={"Content-Type": "application/x-www-form-urlencoded"}
)
async def main():
"""Run the Auth0 MCP client."""
mcp_url = f"http://{MCP_HOST}:{MCP_PORT}"
logger.info(f"Connecting to MCP server at: {mcp_url}")
logger.info(f"Using callback URI: {OAUTH_CALLBACK_URI}")
logger.debug(f"Using Auth0 client ID: {AUTH0_CLIENT_ID}")
logger.debug(f"Using Auth0 client secret: {AUTH0_CLIENT_SECRET}")
# Create storage for tokens and pre-registered client info
storage = InMemoryTokenStorage()
# Pre-populate with registered client information
await storage.set_client_info(OAuthClientInformationFull(
client_id=AUTH0_CLIENT_ID,
client_secret=AUTH0_CLIENT_SECRET,
redirect_uris=[AnyUrl(OAUTH_CALLBACK_URI)],
client_metadata=OAuthClientMetadata(
client_name="Auth0 MCP Client",
redirect_uris=[AnyUrl(OAUTH_CALLBACK_URI)],
grant_types=["authorization_code", "refresh_token"],
response_types=["code"],
scope=REQUIRED_PERMISSION,
)
))
# Create callback handler instance that can share state
callback_handler = OAuthCallbackHandler(audience=AUTH0_AUDIENCE)
# Create OAuth client provider with pre-registered client
oauth_auth = OAuthClientProviderCustom(
audience=AUTH0_AUDIENCE,
server_url=mcp_url,
client_metadata=OAuthClientMetadata(
client_name="Auth0 MCP Client",
redirect_uris=[AnyUrl(OAUTH_CALLBACK_URI)],
grant_types=["authorization_code", "refresh_token"],
response_types=["code"],
scope=REQUIRED_PERMISSION,
),
storage=storage,
redirect_handler=callback_handler.handle_redirect,
callback_handler=callback_handler.handle_callback,
)
tool_names = []
try:
async with streamablehttp_client(url=f"{mcp_url}/mcp", auth=oauth_auth) as (read, write, _):
async with ClientSession(read, write) as session:
logger.info("Initializing session...")
await session.initialize()
logger.info("Session initialized successfully")
session
# List available tools
tools = await session.list_tools()
tool_names = [tool.name for tool in tools.tools]
logger.info(f"Available tools: {tool_names}")
# Call a tool as an example
if tools.tools:
logger.info("\nCalling 'get_time' tool...")
result = await session.call_tool("get_time", arguments={})
logger.info(f"Result: {result.content}")
except Exception as e:
logger.error(f"Error connecting to MCP server: {str(e)}")
raise
logger.debug(f"OAuth token: {await storage.get_tokens()}")
model = ChatGoogleGenerativeAI(
model="gemini-2.5-flash",
google_api_key=os.environ["GOOGLE_API_KEY"]
)
token = await storage.get_tokens()
token = token.access_token
tools_info = [
{
name: {
"transport": "streamable_http",
"url": f"{mcp_url}/mcp",
"auth": oauth_auth,
# "headers": {
# "Authorization": f"Bearer {token}" # use existing token
# },
}
} for name in tool_names
]
tools_dict = {}
for tool in tools_info:
tools_dict.update(tool)
client = MultiServerMCPClient(tools_dict)
tools = await client.get_tools()
agent = create_agent(model=model, tools=tools)
user_message = input("User input: ")
response = await agent.ainvoke({"messages": user_message})
logger.info(f"Agent response: {response["messages"][-1].content}")
if __name__ == "__main__":
# Normal mode - connect to MCP server
asyncio.run(main())
MCPに関連する部分を重点的に解説していきます。
class InMemoryTokenStorage(TokenStorage):
"""Demo In-memory token storage implementation."""
def __init__(self):
self.tokens: OAuthToken | None = None
self.client_info: OAuthClientInformationFull | None = None
async def get_tokens(self) -> OAuthToken | None:
"""Get stored tokens."""
return self.tokens
async def set_tokens(self, tokens: OAuthToken) -> None:
"""Store tokens."""
time.sleep(1) # iat delay to ensure token consistency
self.tokens = tokens
async def get_client_info(self) -> OAuthClientInformationFull | None:
"""Get stored client information."""
return self.client_info
async def set_client_info(self, client_info: OAuthClientInformationFull) -> None:
"""Store client information."""
self.client_info = client_info
これは、認可が成功した場合にアクセストークンなどの情報を管理するためのクラスです。
Python SDK for MCPによるOAuth認証フローが成功した場合は、ここにトークンなどが格納されていきます。
また、set_tokensメソッドにsleep処理が入っていますが、これはある時からJWTの発行された時間を示すiatパラメータの検証で失敗するようになってしまったため、時間の不整合を取り除くための暫定的な処理になります。(おそらくどこかのタイミングでサーバー側の時間とAuth0の時間が合致しなくなってしまったのだと思われるため、不要であれば削除しても問題ないです。)
一度アクセストークンが設定されたインスタンスを使いまわすと再度アクセストークンを取得せずともセキュアなMCPサーバーの利用が可能になります。
class OAuthCallbackHandler:
"""OAuth callback handler that opens browser and captures the callback."""
def __init__(self, audience: str):
self.auth_url: str = ""
self.auth_code: str | None = None
self.state: str | None = None
self.error: str | None = None
self.callback_received = threading.Event()
self.audience: str = audience
# Create and start HTTP server in a separate thread
self.server = HTTPServer(('localhost', OAUTH_CALLBACK_PORT), self._create_callback_handler())
self.server_thread = threading.Thread(target=self.server.serve_forever)
self.server_thread.daemon = True
self.server_thread.start()
def __del__(self):
"""Shutdown the HTTP server on deletion."""
# Stop the server
self.server.shutdown()
self.server_thread.join(timeout=1)
def _create_callback_handler(self):
"""Create an HTTP request handler for OAuth callback."""
handler_self = self
class CallbackHandler(BaseHTTPRequestHandler):
def do_GET(self):
"""Handle GET request from OAuth callback."""
# Parse the callback URL
parsed_url = urlparse(self.path)
params = parse_qs(parsed_url.query)
# Extract authorization code and state
handler_self.auth_code = params.get("code", [None])[0]
handler_self.state = params.get("state", [None])[0]
handler_self.error = params.get("error", [None])[0]
# Send response to browser
self.send_response(200)
self.send_header("Content-type", "text/html; charset=utf-8")
self.end_headers()
if handler_self.error:
html_response = f"""
<!DOCTYPE html>
<html>
<head>
<title>認証エラー</title>
<style>
body {{ font-family: Arial, sans-serif; margin: 50px; background-color: #f8d7da; }}
.container {{ max-width: 600px; margin: 0 auto; padding: 20px; background: white; border-radius: 5px; }}
h1 {{ color: #721c24; }}
</style>
</head>
<body>
<div class="container">
<h1>❌ 認証に失敗しました</h1>
<p>エラー: {handler_self.error}</p>
<p>このウィンドウを閉じて、もう一度お試しください。</p>
</div>
</body>
</html>
"""
else:
html_response = """
<!DOCTYPE html>
<html>
<head>
<title>認証成功</title>
<style>
body { font-family: Arial, sans-serif; margin: 50px; background-color: #d4edda; }
.container { max-width: 600px; margin: 0 auto; padding: 20px; background: white; border-radius: 5px; }
h1 { color: #155724; }
</style>
</head>
<body>
<div class="container">
<h1>✅ 認証に成功しました!</h1>
<p>ログインが完了しました。このウィンドウを閉じて、アプリケーションにお戻りください。</p>
</div>
</body>
</html>
"""
self.wfile.write(html_response.encode('utf-8'))
# Signal that callback was received
handler_self.callback_received.set()
return CallbackHandler
async def handle_redirect(self, auth_url: str) -> None:
"""Handle the OAuth redirect by opening the browser automatically."""
pr = urlparse(auth_url)
d = parse_qs(pr.query)
d["audience"] = [self.audience]
# d["scope"] = ["openid profile email"]
auth_url = urlunparse(pr._replace(query=urlencode(d, doseq=True)))
self.auth_url = auth_url
logger.info("=" * 60)
logger.info("Authorization Required")
logger.info("=" * 60)
logger.info("Opening browser for authorization in incognito/private mode...")
print(f"\nAuthorization URL: {auth_url}\n")
# Open browser automatically in incognito mode
if not open_browser_incognito(auth_url):
logger.error("Failed to open browser")
logger.info("Please complete the login in your browser.")
logger.info("=" * 60)
async def handle_callback(self) -> tuple[str, str | None]:
"""Handle the OAuth callback by running a local HTTP server."""
# Parse callback URI from environment to get port
callback_uri = OAUTH_CALLBACK_URI
parsed_callback = urlparse(callback_uri)
port = parsed_callback.port or 5000
logger.info(f"Starting local HTTP server on port {port}...")
logger.info("Waiting for authorization callback...")
# Wait for callback to be received
await asyncio.get_event_loop().run_in_executor(
None, self.callback_received.wait
)
if self.error:
logger.error(f"Authorization failed: {self.error}")
raise Exception(f"Authorization failed: {self.error}")
if not self.auth_code:
logger.error("No authorization code received")
raise Exception("No authorization code received")
logger.info("Authorization code received successfully")
logger.info("auth_code: %s", self.auth_code)
logger.info("state: %s", self.state)
return self.auth_code, self.state
上記の部分はOAuthによる認可コードの取得とアクセストークンの取得を主に担当している部分になります。
handle_redirectメソッドはauth_redirect引数で受けとった認可サーバーへのリクエストのためのURLを基に、ブラウザ経由で利用者が対象のリソースに対して権限の承認を確認するために用意するコールバック関数です。このコールバック関数を呼び出すOAuthClientProviderはAuthorization Code Grantを利用したアクセスを要求するため、受け取ったURLをブラウザでアクセスし、認可コードの取得を試みるようにしています。
handle_callbackメソッドに関しては、認可サーバーとのやり取りでアクセスが承認された場合に、呼び出し元のOAuthClientProviderに認可コードとステートを伝えるためのコールバック関数になっています。処理としては、あらかじめ起動しておいたローカルサーバーでアクセスが承認された場合のリダイレクトとパラメータを受け取っておき、そのパラメータに含まれていた認可コードとステートを呼び出し元に返すというものになっています。
また、上記の処理だけだと認可コードしか発行されておらず、実際のMCPサーバーへのアクセスに利用されるアクセストークンが発行されていませんが、利用元のOAuthClientProviderクラスが自動でアクセストークンへの交換を実施し、InMemoryTokenStorageインスタンスに格納してくれます。
class OAuthClientProviderCustom(OAuthClientProvider):
def __init__(self, audience: str, **kwargs):
super().__init__(**kwargs)
self.audience = audience
# override to include audience in token request
async def _exchange_token_authorization_code(
self, auth_code: str, code_verifier: str, *, token_data: dict[str, Any] | None = {}
) -> httpx.Request:
"""Build token exchange request for authorization_code flow."""
if self.context.client_metadata.redirect_uris is None:
raise OAuthFlowError("No redirect URIs provided for authorization code grant") # pragma: no cover
if not self.context.client_info:
raise OAuthFlowError("Missing client info") # pragma: no cover
token_url = self._get_token_endpoint()
token_data = token_data or {}
token_data.update(
{
"grant_type": "authorization_code",
"code": auth_code,
"redirect_uri": str(self.context.client_metadata.redirect_uris[0]),
"client_id": self.context.client_info.client_id,
"code_verifier": code_verifier,
"audience": self.audience,
}
)
# Only include resource param if conditions are met
if self.context.should_include_resource_param(self.context.protocol_version):
token_data["resource"] = self.context.get_resource_url() # RFC 8707
if self.context.client_info.client_secret:
token_data["client_secret"] = self.context.client_info.client_secret
logger.debug(f"Token request data: {token_data}")
return httpx.Request(
"POST", token_url, data=token_data, headers={"Content-Type": "application/x-www-form-urlencoded"}
)
上記の部分は、OAuthClientProviderクラスを継承した独自クラスのOAuthClientProviderCustom定義をしている部分になります。
この部分はAuth0の仕様として、認可コードを利用したアクセストークンの発行にaudienceを指定しないと、想定していたJWTではなく、共通鍵不明のJWEが発行されてしまうという事象へ対処するために実装しています。
(詳細はこのサイトを参考)
実装している内容としては、コンストラクタ処理で追加した引数audienceを受け取り、それを内部向けの_exchange_token_authorization_codeメソッドをオーバーライドする形で利用することで、アクセストークンの取得までを自動で実施するというものになっています。
# Create storage for tokens and pre-registered client info
storage = InMemoryTokenStorage()
# Pre-populate with registered client information
await storage.set_client_info(OAuthClientInformationFull(
client_id=AUTH0_CLIENT_ID,
client_secret=AUTH0_CLIENT_SECRET,
redirect_uris=[AnyUrl(OAUTH_CALLBACK_URI)],
client_metadata=OAuthClientMetadata(
client_name="Auth0 MCP Client",
redirect_uris=[AnyUrl(OAUTH_CALLBACK_URI)],
grant_types=["authorization_code", "refresh_token"],
response_types=["code"],
scope=REQUIRED_PERMISSION,
)
))
# Create callback handler instance that can share state
callback_handler = OAuthCallbackHandler(audience=AUTH0_AUDIENCE)
# Create OAuth client provider with pre-registered client
oauth_auth = OAuthClientProviderCustom(
audience=AUTH0_AUDIENCE,
server_url=mcp_url,
client_metadata=OAuthClientMetadata(
client_name="Auth0 MCP Client",
redirect_uris=[AnyUrl(OAUTH_CALLBACK_URI)],
grant_types=["authorization_code", "refresh_token"],
response_types=["code"],
scope=REQUIRED_PERMISSION,
),
storage=storage,
redirect_handler=callback_handler.handle_redirect,
callback_handler=callback_handler.handle_callback,
)
上記はOAuthを利用した認可フローを実施するために必要な情報を登録する部分になっています。
ここで適切に設定したOAuthClientProviderCustomインスタンスをstreamablehttp_clientメソッドやMultiServerMCPClientのコンストラクタに利用することでOAuthを利用したセキュアなMCPサーバーの利用を実現することができます。
MCPサーバー
ソースコード全文
from pydantic import AnyHttpUrl
from mcp.server.auth.provider import AccessToken, TokenVerifier
from mcp.server.auth.settings import AuthSettings
from mcp.server.fastmcp import FastMCP
import datetime
from dotenv import load_dotenv
import os
import jwt
from jwt import PyJWKClient
import logging
# Setup logging
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
# logger.setLevel(logging.DEBUG)
# Load environment variables
load_dotenv()
AUTH0_DOMAIN=os.environ["AUTH0_DOMAIN"]
AUTH0_CLIENT_ID=os.environ["AUTH0_CLIENT_ID"]
AUTH0_CLIENT_SECRET=os.environ["AUTH0_CLIENT_SECRET"]
AUTH0_AUDIENCE = os.environ["AUTH0_AUDIENCE"]
MCP_HOST=os.environ["MCP_HOST"]
MCP_PORT=int(os.environ["MCP_PORT"])
REQUIRED_PERMISSION=os.environ["REQUIRED_PERMISSION"]
OAUTH_CALLBACK_URI=os.environ["OAUTH_CALLBACK_URI"]
OAUTH_CALLBACK_PORT=int(os.environ["OAUTH_CALLBACK_PORT"])
JWT_ALGORITHM = os.environ["JWT_ALGORITHM"]
class Auth0TokenVerifier(TokenVerifier):
"""Auth0 token verifier with flexible authorization modes."""
async def verify_token(self, token: str) -> AccessToken | None:
"""Verify Auth0 JWT token and check user authorization."""
try:
# Log token for debugging (first 50 chars only for security)
logger.debug(f"Received token: {token}")
# Get JWKS from Auth0
jwks_url = f"https://{AUTH0_DOMAIN}/.well-known/jwks.json"
jwks_client = PyJWKClient(jwks_url)
signing_key = jwks_client.get_signing_key_from_jwt(token).key
# Decode and verify the token
payload = jwt.decode(
token,
signing_key,
algorithms=[JWT_ALGORITHM],
audience=AUTH0_AUDIENCE,
issuer=f"https://{AUTH0_DOMAIN}/"
)
logger.debug(f"Decoded payload: {payload}")
# Extract user information
user_id = payload.get("sub")
scopes = payload.get("scope", "").split()
expires_at = payload.get("exp", 0)
# permissions = payload.get("permissions", [])
# roles = payload.get("https://mcp-server/roles", []) # Custom claim for roles
logger.debug(f"Token verified for user: {user_id}")
# logger.debug(f"User permissions: {permissions}")
# logger.debug(f"User roles: {roles}")
logger.debug(f"User scopes: {scopes}")
logger.debug(f"User ID: {user_id}")
logger.debug(f"Token expires at: {expires_at}")
# Return AccessToken with user information
return AccessToken(
token=token,
client_id=user_id,
expires_at=expires_at,
scopes=scopes,
)
except jwt.ExpiredSignatureError:
logger.error("Token has expired")
return None
except jwt.InvalidTokenError as e:
logger.error(f"Invalid token: {str(e)}")
return None
except Exception as e:
logger.error(f"Token verification failed: {str(e)}")
return None
# Create FastMCP instance as a Resource Server
mcp = FastMCP(
"Auth0 Protected MCP Server",
stateless_http=True,
json_response=True,
# Token verifier for authentication
token_verifier=Auth0TokenVerifier(),
# Auth settings for RFC 9728 Protected Resource Metadata
auth=AuthSettings(
issuer_url=AnyHttpUrl(f"https://{AUTH0_DOMAIN}"), # Auth0 Authorization Server URL
resource_server_url=AnyHttpUrl(f"http://{MCP_HOST}:{MCP_PORT}"),
required_scopes=[REQUIRED_PERMISSION],
),
host=MCP_HOST,
port=MCP_PORT
)
@mcp.tool()
def get_time() -> str:
"""Get the current time. (Format: YYYY-MM-DD HH:MM:SS)"""
now = datetime.datetime.now()
return now.strftime("%Y-%m-%d %H:%M:%S")
if __name__ == "__main__":
logger.info(f"Starting Auth0 Protected MCP Server on {MCP_HOST}:{MCP_PORT}")
logger.info(f"Auth0 Domain: {AUTH0_DOMAIN}")
logger.info(f"Auth0 Audience: {AUTH0_AUDIENCE}")
mcp.run(transport="streamable-http")
サーバー側におけるMCPに関連する部分を重点的に解説していきます。
class Auth0TokenVerifier(TokenVerifier):
"""Auth0 token verifier with flexible authorization modes."""
async def verify_token(self, token: str) -> AccessToken | None:
"""Verify Auth0 JWT token and check user authorization."""
try:
# Log token for debugging (first 50 chars only for security)
logger.debug(f"Received token: {token}")
# Get JWKS from Auth0
jwks_url = f"https://{AUTH0_DOMAIN}/.well-known/jwks.json"
jwks_client = PyJWKClient(jwks_url)
signing_key = jwks_client.get_signing_key_from_jwt(token).key
# Decode and verify the token
payload = jwt.decode(
token,
signing_key,
algorithms=[JWT_ALGORITHM],
audience=AUTH0_AUDIENCE,
issuer=f"https://{AUTH0_DOMAIN}/"
)
logger.debug(f"Decoded payload: {payload}")
# Extract user information
user_id = payload.get("sub")
scopes = payload.get("scope", "").split()
expires_at = payload.get("exp", 0)
# permissions = payload.get("permissions", [])
# roles = payload.get("https://mcp-server/roles", []) # Custom claim for roles
logger.debug(f"Token verified for user: {user_id}")
# logger.debug(f"User permissions: {permissions}")
# logger.debug(f"User roles: {roles}")
logger.debug(f"User scopes: {scopes}")
logger.debug(f"User ID: {user_id}")
logger.debug(f"Token expires at: {expires_at}")
# Return AccessToken with user information
return AccessToken(
token=token,
client_id=user_id,
expires_at=expires_at,
scopes=scopes,
)
except jwt.ExpiredSignatureError:
logger.error("Token has expired")
return None
except jwt.InvalidTokenError as e:
logger.error(f"Invalid token: {str(e)}")
return None
except Exception as e:
logger.error(f"Token verification failed: {str(e)}")
return None
上記の部分はMCPクライアントから送られてきたアクセストークンの検証を実施する部分です。
今回のAuth0の設定ではアクセストークンがJWT形式であるため、JWTの検証を実施しています。
Auth0ではhttps://{AUTH0_DOMAIN}/.well-known/jwks.jsonというURLにJWTの検証に必要な共通鍵などの情報が格納されているため、ここの値を参照し、検証を進めていきます。
実際に格納されている値の例
{
"keys":[
{
"kty":"RSA",
"use":"sig",
"n":"xxxxxxxxx",
"e":"AQAB",
"kid":"xxxxxx",
"x5t":"xxxxx",
"x5c":["xxxxxxxx"],
"alg":"RS256"
},{
"kty":"RSA",
"use":"sig",
"n":"xxxxxxxxx",
"e":"AQAB",
"kid":"xxxxxx",
"x5t":"xxxxx",
"x5c":["xxxxxxxx"],
"alg":"RS256"
}
]
}
verify_tokenメソッド内でJWT自体の検証を実施し、呼び出し元に以下のように定義されているAccessTokenクラスのインスタンスを返却することで、適切なアクセス権限が付与されていないMCPクライアントや有効期限が切れているアクセストークンを利用していた場合に403 Forbiddenの応答を返すようにすることができます。
ちなみにですが、筆者環境だとAccessToken.resourceパラメータは検証に利用されていなかったため、検証したい場合は独自カスタムクラスをラップして実装したりする必要があると思います。
class AccessToken(BaseModel):
token: str
client_id: str
scopes: list[str]
expires_at: int | None = None
resource: str | None = None # RFC 8707 resource indicator
以下には実際に動作させるMCPサーバーのインスタンス化部分を記載しています。
今回実装するMCPサーバーは単純にサーバー時間を返すだけツール機能を用意したものをtransport="streamable-http"として実装しています。
またOAuthによる認可を導入するために、FastMCPクラスのコンストラクタにおけるauth引数にAuth0で設定した値などを設定しています。
# Create FastMCP instance as a Resource Server
mcp = FastMCP(
"Auth0 Protected MCP Server",
stateless_http=True,
json_response=True,
# Token verifier for authentication
token_verifier=Auth0TokenVerifier(),
# Auth settings for RFC 9728 Protected Resource Metadata
auth=AuthSettings(
issuer_url=AnyHttpUrl(f"https://{AUTH0_DOMAIN}"), # Auth0 Authorization Server URL
resource_server_url=AnyHttpUrl(f"http://{MCP_HOST}:{MCP_PORT}"),
required_scopes=[REQUIRED_PERMISSION],
),
host=MCP_HOST,
port=MCP_PORT
)
@mcp.tool()
def get_time() -> str:
"""Get the current time. (Format: YYYY-MM-DD HH:MM:SS)"""
now = datetime.datetime.now()
return now.strftime("%Y-%m-%d %H:%M:%S")
動作結果
動作させる場合はuv run mcp_server.py→uv run mcp_client.pyの順でMCPサーバー → MCPクライアントという順でPythonプログラムを起動していきます。
実際に「mcp_client.py」を動作させるとAuth0における動作させているアプリケーションに対してのアクセスを承認するための画面がブラウザで表示されるので、あらかじめアクセス権限を付与していおいたユーザーでログインし、承認します。
このプログラムでは、あらかじめアクセス権限を付与したユーザーでの承認が完了するとユーザーからの入力を受け付けるようになっているため、試しに「What time is it now ?」と入力してみます。すると、MCPクライアントを実行しているターミナルで以下のようなログが出力されていることを確認でき、AIアプリケーションが現在の日時を把握できていることが確認できます。
User input: What time is it now ?
2025-12-09 18:33:43,218 - __main__ - INFO - Agent response: The current time is 2025-12-09 18:33:41.
またMCPサーバー側のログを一部抜粋してみてみると、以下のようになっており、最初にMCPクライアントがアクセストークン無しでアクセスを試み、失敗していることが確認できます。
その後のやり取りに関しては、特にアクセスを拒否することなくMCPサーバーが動作していることの確認ができます。
INFO: Started server process [30916]
INFO: Waiting for application startup.
INFO: Application startup complete.
INFO: Uvicorn running on http://localhost:5555 (Press CTRL+C to quit)
INFO: ::1:49683 - "POST /mcp HTTP/1.1" 401 Unauthorized
INFO: ::1:49683 - "GET /.well-known/oauth-protected-resource HTTP/1.1" 200 OK
INFO: ::1:57136 - "POST /mcp HTTP/1.1" 200 OK
INFO: ::1:57136 - "POST /mcp HTTP/1.1" 202 Accepted
INFO: ::1:51460 - "POST /mcp HTTP/1.1" 200 OK
INFO: ::1:51460 - "POST /mcp HTTP/1.1" 200 OK
INFO: ::1:51463 - "POST /mcp HTTP/1.1" 200 OK
INFO: ::1:51463 - "POST /mcp HTTP/1.1" 202 Accepted
INFO: ::1:51469 - "POST /mcp HTTP/1.1" 200 OK
INFO: ::1:51472 - "POST /mcp HTTP/1.1" 200 OK
INFO: ::1:51472 - "POST /mcp HTTP/1.1" 202 Accepted
INFO: ::1:51475 - "POST /mcp HTTP/1.1" 200 OK
INFO: ::1:51475 - "POST /mcp HTTP/1.1" 200 OK
次にあらかじめアクセス権限を付与していないユーザーによる承認を実施した場合のMCPサーバーのログは以下のようになっており、正常にアクセス権限が付与されていないことが確認できます。
INFO: Started server process [9912]
INFO: Waiting for application startup.
INFO: Application startup complete.
INFO: Uvicorn running on http://localhost:5555 (Press CTRL+C to quit)
INFO: ::1:62816 - "POST /mcp HTTP/1.1" 401 Unauthorized
INFO: ::1:62816 - "GET /.well-known/oauth-protected-resource HTTP/1.1" 200 OK
INFO: ::1:54066 - "POST /mcp HTTP/1.1" 403 Forbidden
感想
今回はMCPに認可機能が実装されているということを記事執筆の半年くらい前に知っていたのですが、あまり情報が増えない & 実際にMCPサーバーを外部公開する際に必須な技術だと思ったのでこの記事を書いてみました。
MCPサーバーに市中の認証/認可サービスを用いたOAuthによるアクセス制限を実装する方法がなかなか見つからなかったため、公式が提供しているサンプルプログラムをメインの情報源として実装したため、なかなか大変でしたが、いい経験になったと思います。
(ちなみに一番苦労したのはAuth0の設定だったりします...)
MCP自体はまだまだ発展途上の規格だったりするので、今後のアップデートで今回作成したプログラムが動作しなくなったりする可能性はありますが、最新の規格と照らし合わせて参考にしてもらえると助かります。
また明日以降もNTTテクノクロスアドベントカレンダー2025の記事が投稿されるため引き続きお楽しみいただけると助かります。
参考にしたサイト
- Auth0を使う: Auth0から想定していないJWEが返ってきていた際の解決で参考させていただきました
- MCPドキュメント: MCPの基本的な情報は大体ここから取得しました
- Python SDK for MCP: 認可機能の実装は公式の公開しているサンプルプログラムをベースに作成しました
- OAuth 2.0 全フローの図解と動画: OAuthの流れを知るために参考にしました。


