こちらを実践します。
上の記事のソースは抜粋なので、所々補完しています。この記事では以下のような分担になっています。
- MCPクライアント: ローカルマシン上で実装されるPythonスクリプト
- MCPサーバー: こちらにあるお天気ツール
- リモートサービス: National Weather Service API
じゃあ、Databricksがどこにあるのかという話なのですが、この場合はMCPクライアントから呼び出されるLLMの部分です。基盤モデルAPI経由でLLMを使う構成となっています。
.env
Databricksへの接続情報を格納します。
.env
DATABRICKS_HOST=<Databricksワークスペースのホスト名>
DATABRICKS_TOKEN=<Databricksパーソナルアクセストークン>
weather.py
National Weather Service APIを呼び出すMCPサーバーを実装します。
weather.py
from typing import Any
import httpx
from mcp.server.fastmcp import FastMCP
# FastMCPサーバーを初期化
mcp = FastMCP("weather")
# 定数
NWS_API_BASE = "https://api.weather.gov"
USER_AGENT = "weather-app/1.0"
# HTTPクライアントを初期化
http = httpx.AsyncClient(timeout=10.0)
async def make_nws_request(url: str) -> dict[str, Any] | None:
"""Make a request to the NWS API with proper error handling."""
headers = {
"User-Agent": USER_AGENT,
"Accept": "application/geo+json"
}
async with httpx.AsyncClient() as client:
try:
response = await client.get(url, headers=headers, timeout=30.0)
response.raise_for_status()
return response.json()
except Exception:
return None
def format_alert(feature: dict) -> str:
"""Format an alert feature into a readable string."""
props = feature["properties"]
return f"""
Event: {props.get('event', 'Unknown')}
Area: {props.get('areaDesc', 'Unknown')}
Severity: {props.get('severity', 'Unknown')}
Description: {props.get('description', 'No description available')}
Instructions: {props.get('instruction', 'No specific instructions provided')}
"""
@mcp.tool()
async def get_forecast(latitude: float, longitude: float) -> str:
"""指定した場所の天気予報を取得します。
引数:
latitude: 場所の緯度
longitude: 場所の経度
"""
# まず予報グリッドのエンドポイントを取得
points_url = f"{NWS_API_BASE}/points/{latitude},{longitude}"
points_data = await make_nws_request(points_url)
if not points_data:
return "この場所の予報データを取得できませんでした。"
# レスポンスから予報URLを取得
forecast_url = points_data["properties"]["forecast"]
forecast_data = await make_nws_request(forecast_url)
if not forecast_data:
return "詳細な予報を取得できませんでした。"
# 期間ごとに読みやすい予報文を作成
periods = forecast_data["properties"]["periods"]
forecasts = []
for period in periods[:5]: # 次の5期間のみ表示
forecast = f"""
{period['name']}:
気温: {period['temperature']}°{period['temperatureUnit']}
風: {period['windSpeed']} {period['windDirection']}
予報: {period['detailedForecast']}
"""
forecasts.append(forecast)
return "\n---\n".join(forecasts)
@mcp.tool()
async def get_alerts(state: str) -> str:
"""アメリカの州ごとの天気警報を取得します。
引数:
state: 2文字の州コード(例: CA, NY)
"""
url = f"{NWS_API_BASE}/alerts/active/area/{state}"
data = await make_nws_request(url)
if not data or "features" not in data:
return "警報を取得できないか、警報が見つかりませんでした。"
if not data["features"]:
return "この州には現在有効な警報はありません。"
alerts = [format_alert(feature) for feature in data["features"]]
return "\n---\n".join(alerts)
if __name__ == "__main__":
# サーバーを初期化して実行
mcp.run(transport='stdio')
client.py
Databricks SDKを活用したMCPクライアントを実装します。
client.py
import asyncio
from typing import Optional
from contextlib import AsyncExitStack
import sys
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from databricks.sdk import WorkspaceClient
from dotenv import load_dotenv
import json
load_dotenv() # .envから環境変数を読み込む
class MCPClient:
def __init__(self, llm_endpoint_name: str = "databricks-meta-llama-3-3-70b-instruct"):
# セッションとクライアントオブジェクトを初期化
self.session: Optional[ClientSession] = None
self.exit_stack = AsyncExitStack()
# Databricksクライアントを初期化し、OpenAI互換クライアントを取得
self.workspace = WorkspaceClient()
self.llm_endpoint_name = llm_endpoint_name
self.openai_client = self.workspace.serving_endpoints.get_open_ai_client()
print(f"{llm_endpoint_name} 用のOpenAI互換クライアントを初期化しました")
async def connect_to_server(self, server_script_path: str):
"""MCPサーバーに接続する"""
is_python = server_script_path.endswith('.py')
is_js = server_script_path.endswith('.js')
if not (is_python or is_js):
raise ValueError("サーバースクリプトは.pyまたは.jsファイルでなければなりません")
command = "python" if is_python else "node"
server_params = StdioServerParameters(
command=command,
args=[server_script_path],
env=None
)
stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))
self.stdio, self.write = stdio_transport
self.session = await self.exit_stack.enter_async_context(ClientSession(self.stdio, self.write))
await self.session.initialize()
# 利用可能なツールを一覧表示
response = await self.session.list_tools()
tools = response.tools
print("\nサーバーに接続しました。利用可能なツール:", [tool.name for tool in tools])
# ツールの説明を表示
print("\n利用可能なツール:")
for tool in tools:
print(f"- {tool.name}: {tool.description}")
def _convert_schema_to_openai_format(self, tool):
"""MCPツールのスキーマをOpenAI関数形式に変換"""
# ツールから入力スキーマを抽出
schema = tool.inputSchema
# 関数定義を作成
function_def = {
"name": tool.name,
"description": tool.description,
"parameters": schema
}
# OpenAIツール形式で返す
return {
"type": "function",
"function": function_def
}
async def process_query(self, query: str) -> str:
"""Meta Llamaと利用可能なツールを使ってクエリを処理する"""
if not self.session:
print("エラー: サーバーに接続されていません")
return "サーバーに接続されていません。"
# ユーザーのクエリで会話を初期化
messages = [
{"role": "user", "content": query}
]
# 利用可能なツールを取得し、OpenAI形式に変換
response = await self.session.list_tools()
available_tools = [self._convert_schema_to_openai_format(tool) for tool in response.tools]
final_text = []
try:
# 最初のLLM APIコールを実行
print("ツール付きでLLMにクエリを送信中...")
llm_response = self.openai_client.chat.completions.create(
model=self.llm_endpoint_name,
messages=messages,
tools=available_tools,
max_tokens=1000
)
# アシスタントの応答を取得
assistant_message = llm_response.choices[0].message
# アシスタントからの内容を出力に追加
if assistant_message.content:
final_text.append(assistant_message.content)
# ツールコールがあるか確認
if hasattr(assistant_message, 'tool_calls') and assistant_message.tool_calls:
# 各ツールコールを処理
for tool_call in assistant_message.tool_calls:
tool_name = tool_call.function.name
tool_args = json.loads(tool_call.function.arguments)
# ツールコールを実行
try:
final_text.append(f"\n[ツール呼び出し: {tool_name} 引数: {tool_args}]")
result = await self.session.call_tool(tool_name, tool_args)
# 結果の生テキストを抽出
raw_result = str(result.content)
# TextContentオブジェクトの場合、テキスト部分のみ抽出
if 'TextContent' in raw_result and "text='" in raw_result:
# クォート間のテキスト内容を取得
text_start = raw_result.index("text='") + 6
text_end = raw_result.rindex("'")
if text_start < text_end:
cleaned_result = raw_result[text_start:text_end]
# 改行やタブをアンエスケープ
cleaned_result = cleaned_result.replace('\\n', '\n').replace('\\t', '\t')
final_text.append(f"\n{tool_args.get('latitude', '')},{tool_args.get('longitude', '')} の予報:\n{cleaned_result}")
else:
final_text.append(f"\n結果: {raw_result}")
else:
final_text.append(f"\n結果: {raw_result}")
except Exception as e:
final_text.append(f"\nツール呼び出しエラー: {str(e)}")
else:
final_text.append(f"\n結果: {raw_result}")
except Exception as e:
final_text.append(f"\nエラー: {str(e)}")
return "\n".join(final_text)
async def chat_loop(self):
"""対話型チャットループを実行"""
print("\nMCP天気クライアントを開始しました!")
print("天気について質問できます。例:")
print("- サンフランシスコの天気予報は?")
print("- CAの天気警報はありますか?")
print("- 緯度37.7749、経度-122.4194の予報を教えて")
print("(終了するには 'quit' と入力)")
while True:
try:
query = input("\n質問: ").strip()
if query.lower() == 'quit':
break
if query.strip():
print("\nクエリを処理中...")
response = await self.process_query(query)
print("\n応答:")
print(response)
except Exception as e:
print(f"\nエラー: {str(e)}")
async def cleanup(self):
"""リソースのクリーンアップ"""
await self.exit_stack.aclose()
async def main():
# 引数がなければカレントディレクトリのweather.pyをデフォルトに
server_script_path = sys.argv[1] if len(sys.argv) > 1 else "weather.py"
# LLMエンドポイント名のオプション引数
llm_endpoint_name = sys.argv[2] if len(sys.argv) > 2 else "databricks-meta-llama-3-3-70b-instruct"
client = MCPClient(llm_endpoint_name)
try:
print(f"サーバーに接続中: {server_script_path}")
await client.connect_to_server(server_script_path)
await client.chat_loop()
finally:
await client.cleanup()
if __name__ == "__main__":
asyncio.run(main())
MCPクライアントの実行
仮想環境を作ります。
python3 -m venv .venv
source .venv/bin/activate
必要なライブラリをインストールします。
pip install openai databricks-sdk httpx mcp
クライアントを実行します。
python client.py
サンフランシスコの天気を聞いてみます。get_forecast
を呼び出して天気予報が返却されました。
天気警報を聞いてみます。get_alerts
が呼び出されました。
MCPの基本的な動作の確認には役立ちますが、Databricksのデータをやり取りする構成の方が面白いですね。