1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Databricksを用いてModel Context Protocol(MCP)サーバーに接続してみる

Posted at

こちらを実践します。

上の記事のソースは抜粋なので、所々補完しています。この記事では以下のような分担になっています。

  • MCPクライアント: ローカルマシン上で実装されるPythonスクリプト
  • MCPサーバー: こちらにあるお天気ツール
  • リモートサービス: National Weather Service API

じゃあ、Databricksがどこにあるのかという話なのですが、この場合はMCPクライアントから呼び出されるLLMの部分です。基盤モデルAPI経由でLLMを使う構成となっています。

.env

Databricksへの接続情報を格納します。

.env
DATABRICKS_HOST=<Databricksワークスペースのホスト名>
DATABRICKS_TOKEN=<Databricksパーソナルアクセストークン>

weather.py

National Weather Service APIを呼び出すMCPサーバーを実装します。

weather.py
from typing import Any
import httpx
from mcp.server.fastmcp import FastMCP

# FastMCPサーバーを初期化
mcp = FastMCP("weather")

# 定数
NWS_API_BASE = "https://api.weather.gov"
USER_AGENT = "weather-app/1.0"

# HTTPクライアントを初期化
http = httpx.AsyncClient(timeout=10.0)

async def make_nws_request(url: str) -> dict[str, Any] | None:
    """Make a request to the NWS API with proper error handling."""
    headers = {
        "User-Agent": USER_AGENT,
        "Accept": "application/geo+json"
    }
    async with httpx.AsyncClient() as client:
        try:
            response = await client.get(url, headers=headers, timeout=30.0)
            response.raise_for_status()
            return response.json()
        except Exception:
            return None

def format_alert(feature: dict) -> str:
    """Format an alert feature into a readable string."""
    props = feature["properties"]
    return f"""
Event: {props.get('event', 'Unknown')}
Area: {props.get('areaDesc', 'Unknown')}
Severity: {props.get('severity', 'Unknown')}
Description: {props.get('description', 'No description available')}
Instructions: {props.get('instruction', 'No specific instructions provided')}
"""

@mcp.tool()
async def get_forecast(latitude: float, longitude: float) -> str:
    """指定した場所の天気予報を取得します。

    引数:
        latitude: 場所の緯度
        longitude: 場所の経度
    """
    # まず予報グリッドのエンドポイントを取得
    points_url = f"{NWS_API_BASE}/points/{latitude},{longitude}"
    points_data = await make_nws_request(points_url)

    if not points_data:
        return "この場所の予報データを取得できませんでした。"

    # レスポンスから予報URLを取得
    forecast_url = points_data["properties"]["forecast"]
    forecast_data = await make_nws_request(forecast_url)

    if not forecast_data:
        return "詳細な予報を取得できませんでした。"

    # 期間ごとに読みやすい予報文を作成
    periods = forecast_data["properties"]["periods"]
    forecasts = []
    for period in periods[:5]:  # 次の5期間のみ表示
        forecast = f"""
{period['name']}:
気温: {period['temperature']}°{period['temperatureUnit']}
風: {period['windSpeed']} {period['windDirection']}
予報: {period['detailedForecast']}
"""
        forecasts.append(forecast)

    return "\n---\n".join(forecasts)

@mcp.tool()
async def get_alerts(state: str) -> str:
    """アメリカの州ごとの天気警報を取得します。

    引数:
        state: 2文字の州コード(例: CA, NY)
    """
    url = f"{NWS_API_BASE}/alerts/active/area/{state}"
    data = await make_nws_request(url)

    if not data or "features" not in data:
        return "警報を取得できないか、警報が見つかりませんでした。"

    if not data["features"]:
        return "この州には現在有効な警報はありません。"

    alerts = [format_alert(feature) for feature in data["features"]]
    return "\n---\n".join(alerts)

if __name__ == "__main__":
    # サーバーを初期化して実行
    mcp.run(transport='stdio')

client.py

Databricks SDKを活用したMCPクライアントを実装します。

client.py
import asyncio
from typing import Optional
from contextlib import AsyncExitStack
import sys

from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client

from databricks.sdk import WorkspaceClient
from dotenv import load_dotenv

import json

load_dotenv()  # .envから環境変数を読み込む

class MCPClient:
    def __init__(self, llm_endpoint_name: str = "databricks-meta-llama-3-3-70b-instruct"):
        # セッションとクライアントオブジェクトを初期化
        self.session: Optional[ClientSession] = None
        self.exit_stack = AsyncExitStack()
        
        # Databricksクライアントを初期化し、OpenAI互換クライアントを取得
        self.workspace = WorkspaceClient()
        self.llm_endpoint_name = llm_endpoint_name
        self.openai_client = self.workspace.serving_endpoints.get_open_ai_client()
        print(f"{llm_endpoint_name} 用のOpenAI互換クライアントを初期化しました")


    async def connect_to_server(self, server_script_path: str):
        """MCPサーバーに接続する"""
        is_python = server_script_path.endswith('.py')
        is_js = server_script_path.endswith('.js')
        if not (is_python or is_js):
            raise ValueError("サーバースクリプトは.pyまたは.jsファイルでなければなりません")
            
        command = "python" if is_python else "node"
        server_params = StdioServerParameters(
            command=command,
            args=[server_script_path],
            env=None
        )

        stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))
        self.stdio, self.write = stdio_transport
        self.session = await self.exit_stack.enter_async_context(ClientSession(self.stdio, self.write))
        
        await self.session.initialize()
        
        # 利用可能なツールを一覧表示
        response = await self.session.list_tools()
        tools = response.tools
        print("\nサーバーに接続しました。利用可能なツール:", [tool.name for tool in tools])
        
        # ツールの説明を表示
        print("\n利用可能なツール:")
        for tool in tools:
            print(f"- {tool.name}: {tool.description}")

    def _convert_schema_to_openai_format(self, tool):
        """MCPツールのスキーマをOpenAI関数形式に変換"""
        # ツールから入力スキーマを抽出
        schema = tool.inputSchema
        
        # 関数定義を作成
        function_def = {
            "name": tool.name,
            "description": tool.description,
            "parameters": schema
        }
        
        # OpenAIツール形式で返す
        return {
            "type": "function",
            "function": function_def
        }

    async def process_query(self, query: str) -> str:
        """Meta Llamaと利用可能なツールを使ってクエリを処理する"""
        if not self.session:
            print("エラー: サーバーに接続されていません")
            return "サーバーに接続されていません。"

        # ユーザーのクエリで会話を初期化
        messages = [
            {"role": "user", "content": query}
        ]

        # 利用可能なツールを取得し、OpenAI形式に変換
        response = await self.session.list_tools()
        available_tools = [self._convert_schema_to_openai_format(tool) for tool in response.tools]

        final_text = []
        
        try:
            # 最初のLLM APIコールを実行
            print("ツール付きでLLMにクエリを送信中...")
            llm_response = self.openai_client.chat.completions.create(
                model=self.llm_endpoint_name,
                messages=messages,
                tools=available_tools,
                max_tokens=1000
            )
            
            # アシスタントの応答を取得
            assistant_message = llm_response.choices[0].message
            
            # アシスタントからの内容を出力に追加
            if assistant_message.content:
                final_text.append(assistant_message.content)
            
            # ツールコールがあるか確認
            if hasattr(assistant_message, 'tool_calls') and assistant_message.tool_calls:
                # 各ツールコールを処理
                for tool_call in assistant_message.tool_calls:
                    tool_name = tool_call.function.name
                    tool_args = json.loads(tool_call.function.arguments)

                    # ツールコールを実行
                    try:
                        final_text.append(f"\n[ツール呼び出し: {tool_name} 引数: {tool_args}]")
                        result = await self.session.call_tool(tool_name, tool_args)
        
                        # 結果の生テキストを抽出
                        raw_result = str(result.content)

                        # TextContentオブジェクトの場合、テキスト部分のみ抽出
                        if 'TextContent' in raw_result and "text='" in raw_result:
                            # クォート間のテキスト内容を取得
                            text_start = raw_result.index("text='") + 6
                            text_end = raw_result.rindex("'")
                            if text_start < text_end:
                                cleaned_result = raw_result[text_start:text_end]
                                # 改行やタブをアンエスケープ
                                cleaned_result = cleaned_result.replace('\\n', '\n').replace('\\t', '\t')
                                final_text.append(f"\n{tool_args.get('latitude', '')},{tool_args.get('longitude', '')} の予報:\n{cleaned_result}")
                            else:
                                final_text.append(f"\n結果: {raw_result}")
                        else:
                            final_text.append(f"\n結果: {raw_result}")
                    except Exception as e:
                        final_text.append(f"\nツール呼び出しエラー: {str(e)}")
            else:
                final_text.append(f"\n結果: {raw_result}")
        except Exception as e:
            final_text.append(f"\nエラー: {str(e)}")

        return "\n".join(final_text)


    async def chat_loop(self):
        """対話型チャットループを実行"""
        print("\nMCP天気クライアントを開始しました!")
        print("天気について質問できます。例:")
        print("- サンフランシスコの天気予報は?")
        print("- CAの天気警報はありますか?")
        print("- 緯度37.7749、経度-122.4194の予報を教えて")
        print("(終了するには 'quit' と入力)")
        
        while True:
            try:
                query = input("\n質問: ").strip()
                
                if query.lower() == 'quit':
                    break
                    
                if query.strip():
                    print("\nクエリを処理中...")
                    response = await self.process_query(query)
                    print("\n応答:")
                    print(response)
                    
            except Exception as e:
                print(f"\nエラー: {str(e)}")

    async def cleanup(self):
        """リソースのクリーンアップ"""
        await self.exit_stack.aclose()

async def main():
    # 引数がなければカレントディレクトリのweather.pyをデフォルトに
    server_script_path = sys.argv[1] if len(sys.argv) > 1 else "weather.py"
    
    # LLMエンドポイント名のオプション引数
    llm_endpoint_name = sys.argv[2] if len(sys.argv) > 2 else "databricks-meta-llama-3-3-70b-instruct"
        
    client = MCPClient(llm_endpoint_name)
    try:
        print(f"サーバーに接続中: {server_script_path}")
        await client.connect_to_server(server_script_path)
        await client.chat_loop()
    finally:
        await client.cleanup()

if __name__ == "__main__":
    asyncio.run(main())

MCPクライアントの実行

仮想環境を作ります。

python3 -m venv .venv
source .venv/bin/activate

必要なライブラリをインストールします。

pip install openai databricks-sdk httpx mcp

クライアントを実行します。

python client.py

サンフランシスコの天気を聞いてみます。get_forecastを呼び出して天気予報が返却されました。

Screenshot 2025-05-15 at 17.36.12.png

天気警報を聞いてみます。get_alertsが呼び出されました。

Screenshot 2025-05-15 at 17.36.40.png

MCPの基本的な動作の確認には役立ちますが、Databricksのデータをやり取りする構成の方が面白いですね。

はじめてのDatabricks

はじめてのDatabricks

Databricks無料トライアル

Databricks無料トライアル

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?