背景
- StreamlitをMCPのホストとして使えないか検証してみます
- とりあえず、こんな感じのUIにしてBrave SearchのMCP Serverに繋ぎ込めるかやってみました
docs
- docのMCP clientの構築を参考にしました
実装
- Streamlitは同期的なので、serverを立てるときは別のスレッドで立ち上げる処理を書きました
- process_queryのたびにserverの接続をするようにしました
- 初回でserver立ち上げてst.session_state.sessionに持たせようとしましたが、うまくいきませんでした
import asyncio
from contextlib import AsyncExitStack
import threading
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from anthropic import Anthropic
from dotenv import load_dotenv
import streamlit as st
import os
load_dotenv() # load environment variables from .env
# イベントループを別スレッドで回す設定
new_loop = asyncio.new_event_loop()
def start_loop(loop):
asyncio.set_event_loop(loop)
loop.run_forever()
threading.Thread(target=start_loop, args=(new_loop,), daemon=True).start()
def run_async(coro):
"""
別スレッドのイベントループで非同期処理を実行するヘルパー関数
"""
return asyncio.run_coroutine_threadsafe(coro, new_loop).result()
async def connect_to_server() -> ClientSession:
"""Connect to an MCP server
Args:
server_script_path: Path to the server script (.py or .js)
"""
exit_stack = AsyncExitStack()
server_params = StdioServerParameters(
command="npx",
args=["@modelcontextprotocol/server-brave-search"],
env={
"BRAVE_API_KEY": os.getenv("BRAVE_API_KEY")
}
)
stdio_transport = await exit_stack.enter_async_context(stdio_client(server_params))
stdio, write = stdio_transport
session = await exit_stack.enter_async_context(ClientSession(stdio, write))
await session.initialize()
tool_response = await session.list_tools()
available_tools = [{
"name": tool.name,
"description": tool.description,
"input_schema": tool.inputSchema
} for tool in tool_response.tools]
print("\nConnected to server with tools:", [tool["name"] for tool in available_tools])
return session, exit_stack, available_tools
async def disconnect_server(exit_stack: AsyncExitStack):
"""
exit_stack をクローズして、接続を明示的に切断する
"""
try:
await exit_stack.aclose()
except Exception as e:
print("Error during disconnect:", e)
async def process_query(query: str) -> str:
session, exit_stack, available_tools = await connect_to_server()
anthropic = Anthropic()
"""Process a query using Claude and available tools"""
messages = [
{
"role": "user",
"content": query
}
]
# Initial Claude API call
response = anthropic.messages.create(
model="claude-3-7-sonnet-latest",
max_tokens=1000,
messages=messages,
tools=available_tools
)
# Process response and handle tool calls
final_text = []
assistant_message_content = []
for content in response.content:
if content.type == 'text':
final_text.append(content.text)
assistant_message_content.append(content)
elif content.type == 'tool_use':
tool_name = content.name
tool_args = content.input
result = await asyncio.wait_for(
session.call_tool(tool_name, tool_args), timeout=30
)
final_text.append(f"[Tool {tool_name} returned: {result.content}]")
assistant_message_content.append(content)
messages.append({
"role": "assistant",
"content": assistant_message_content
})
messages.append({
"role": "user",
"content": [
{
"type": "tool_result",
"tool_use_id": content.id,
"content": result.content if result else "Timed out"
}
]
})
# Get next response from Claude
response = anthropic.messages.create(
model="claude-3-7-sonnet-latest",
max_tokens=1000,
messages=messages,
tools=st.session_state.available_tools
)
final_text.append(response.content[0].text)
await disconnect_server(exit_stack)
st.session_state.result = "\n".join(final_text)
print(final_text)
return "\n".join(final_text)
def main_st():
st.text_area("Query", key="query")
if st.button("Submit", key="submit"):
result = run_async(process_query(st.session_state.query))
st.markdown(result)
if __name__ == "__main__":
main_st()
- 動いた!
- "日本の現在の総理大臣は?" とか聞くと、ちゃんとbrave searchで検索して結果を返してくれました
感想
- ちょっとめんどくさいけど、できなくはない
- 社内ツール用なんかにMCPServerをいくつか入れてプロンプトチューニングしたChatを用意することなんかはサクッとできそうですね
- MCP触ってるとchatUIで全部すんじゃうなって感じになってきますね
- 次はAgentsSDK + MCP触りたい