こちらの発展系です。
さらに理解を深めるには自分で作るしかないと。クラスター一覧の取得やSQL実行を行う既出なので、(おそらく)まだ誰も作っていないGenie Conversation APIを使ったMCPサーバーを作ってみます。
作る際にはAPIリファレンスが必須です。
作ったと言いましても、多くをこちらから流用させていただきました。ありがとうございます。
事前準備
Genie Conversation APIを使うにはアクセス先のGenieスペースが必要なので、作成および設定しておきます。テーブルや指示を設定しておきます。
以降でMCPサーバーを実装します。maim.py
および依存するファイルdbapi.py
とatabricks_formatter.py
を実装し、設定ファイルを作成します。
MCPサーバーの実装
maim.py
"""
Databricks Genie MCP Server
MCPを使用してGenieとの対話を可能にするサーバー
"""
from mcp.server.fastmcp import FastMCP
from dbapi import genie_conversation
from databricks_formatter import format_query_results
# MCPサーバーを初期化
# サーバー名を指定して新しいインスタンスを作成
mcp = FastMCP("Databricks Genie MCP Server")
@mcp.tool()
async def start_conversation(content: str) -> str:
"""
Genie APIとの会話を開始する
Args:
content (str): Genieに送信するメッセージ内容
Returns:
str: Genieからのレスポンスを整形した結果
"""
try:
# Genie APIを呼び出して応答を取得
result = await genie_conversation(content)
# 応答結果を見やすい形式に整形
return format_query_results(result)
except Exception as e:
# エラーが発生した場合はエラーメッセージを返す
return f"Error talking to Genie: {str(e)}"
# メインエントリーポイント
if __name__ == "__main__":
# デフォルトでstdioトランスポートを使用してサーバーを起動
# これによりコマンドラインからの対話が可能になる
mcp.run()
dbapi.py
Genie APIを呼び出すように変更しています。Genieの会話が完了するとStatement Executionの結果IDを取得できるので、それを指定して結果セットを取り出す流れとなります。
"""
Databricks API通信モジュール
Genieとの対話に必要なAPI呼び出しを管理します
"""
from typing import Any, Dict, Optional
import os
import asyncio
import httpx
from dotenv import load_dotenv
# 環境変数を.envファイルから読み込み
load_dotenv()
# Databricks接続に必要な設定値
DATABRICKS_HOST = os.environ.get("DATABRICKS_HOST", "")
DATABRICKS_TOKEN = os.environ.get("DATABRICKS_TOKEN", "")
DATABRICKS_GENIE_SPACE_ID = os.environ.get("DATABRICKS_GENIE_SPACE_ID", "")
# APIエンドポイントの定義
GENIE_START_CONVERSATION_API = "/api/2.0/genie/spaces/{space_id}/start-conversation" # 会話開始用エンドポイント
GENIE_GET_MESSAGE_API = "/api/2.0/genie/spaces/{space_id}/conversations/{conversation_id}/messages/{message_id}" # メッセージ取得用エンドポイント
STATEMENT_API = "/api/2.0/sql/statements/{statement_id}" # SQLステートメント実行結果取得用エンドポイント
async def make_databricks_request(
method: str,
endpoint: str,
json_data: Optional[Dict[str, Any]] = None,
params: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
DatabricksのAPIにリクエストを送信する汎用関数
Args:
method (str): HTTPメソッド('get'または'post')
endpoint (str): APIエンドポイントのパス
json_data (Optional[Dict[str, Any]]): POSTリクエスト用のJSONデータ
params (Optional[Dict[str, Any]]): GETリクエスト用のクエリパラメータ
Returns:
Dict[str, Any]: APIレスポンスのJSONデータ
"""
url = f"{DATABRICKS_HOST}{endpoint}"
print(url)
# 認証ヘッダーの設定
headers = {
"Authorization": f"Bearer {DATABRICKS_TOKEN}",
"Content-Type": "application/json"
}
async with httpx.AsyncClient() as client:
try:
# HTTPメソッドに応じてリクエストを実行
if method.lower() == "get":
response = await client.get(url, headers=headers, params=params, timeout=30.0)
elif method.lower() == "post":
response = await client.post(url, headers=headers, json=json_data, timeout=30.0)
else:
raise ValueError(f"サポートされていないHTTPメソッド: {method}")
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
# HTTPエラーの詳細情報を取得して例外を発生
error_message = f"HTTPエラー: {e.response.status_code}"
try:
error_detail = e.response.json()
error_message += f" - {error_detail.get('message', '')}"
except Exception:
pass
raise Exception(error_message)
except Exception as e:
raise Exception(f"Databricks APIリクエスト中にエラーが発生: {str(e)}")
async def genie_conversation(content: str, space_id: Optional[str] = None) -> Dict[str, Any]:
"""
Genieとの会話を実行し、結果が得られるまで待機する
Args:
content (str): Genieに送信するメッセージ内容
space_id (Optional[str]): Genie Space ID(未指定時は環境変数から取得)
Returns:
Dict[str, Any]: 会話の実行結果
"""
if not space_id:
space_id = DATABRICKS_GENIE_SPACE_ID
if not space_id:
raise ValueError("Genie Space IDが必要です。環境変数DATABRICKS_GENIE_SPACE_IDを設定するか、パラメータとして指定してください。")
# 会話開始リクエストの作成
statement_data = {
"content": content
}
# 会話を開始し、conversation_idとmessage_idを取得
endpoint_url = GENIE_START_CONVERSATION_API.format(space_id=space_id)
response = await make_databricks_request("post", endpoint_url, json_data=statement_data)
message = response.get("message")
conversation_id = message["conversation_id"]
message_id = message["id"]
if not conversation_id:
raise Exception("レスポンスからconversation_IDの取得に失敗しました")
# 会話生成完了をポーリング
max_retries = 60 # 最大リトライ回数(10秒間隔で10分)
retry_count = 0
while retry_count < max_retries:
# メッセージのステータスを確認
message_status = await make_databricks_request(
"get",
GENIE_GET_MESSAGE_API.format(space_id=space_id, conversation_id=conversation_id, message_id=message_id)
)
status = message_status["status"]
if status == "COMPLETED":
# 完了した場合、クエリ結果を取得
query_result_statement_id = message_status["attachments"][0]["query"]["statement_id"]
statement_status = await make_databricks_request(
"get",
STATEMENT_API.format(statement_id=query_result_statement_id)
)
return statement_status
elif status in ["FAILED", "CANCELED"]:
error_message = message_status["status"]
raise Exception(f"メッセージの取得に失敗: {error_message}")
# 次のポーリングまで待機
await asyncio.sleep(10)
retry_count += 1
return message["content"]
databricks_formatter.py
これはオリジナルのままです。結果セットを整形します。
from typing import Any, Dict
def format_query_results(result: Dict[str, Any]) -> str:
"""Format query results into a readable string."""
# Check if result is empty or doesn't have the expected structure
if not result or 'manifest' not in result or 'result' not in result:
return "No results or invalid result format."
# Extract column names from the manifest
column_names = []
if 'manifest' in result and 'schema' in result['manifest'] and 'columns' in result['manifest']['schema']:
columns = result['manifest']['schema']['columns']
column_names = [col['name'] for col in columns] if columns else []
# If no column names were found, return early
if not column_names:
return "No columns found in the result."
# Extract rows from the result
rows = []
if 'result' in result and 'data_array' in result['result']:
rows = result['result']['data_array']
# If no rows were found, return just the column headers
if not rows:
# Format as a table
output = []
# Add header
output.append(" | ".join(column_names))
output.append("-" * (sum(len(name) + 3 for name in column_names) - 1))
output.append("No data rows found.")
return "\n".join(output)
# Format as a table
output = []
# Add header
output.append(" | ".join(column_names))
output.append("-" * (sum(len(name) + 3 for name in column_names) - 1))
# Add rows
for row in rows:
row_values = []
for value in row:
if value is None:
row_values.append("NULL")
else:
row_values.append(str(value))
output.append(" | ".join(row_values))
return "\n".join(output)
pyproject.toml
依存ライブラリを記述します。
[project]
name = "databricks_test"
version = "0.1.0"
description = "Databricks Genie Integration"
readme = "README.md"
requires-python = ">=3.10"
dependencies = [
"httpx>=0.28.1",
"mcp[cli]>=1.3.0",
]
mcp.json
{
"mcpServers": {
"databricks_test": {
"command": "uv",
"args": [
"--directory",
"/Users/takaaki.yayoi/cursor/my_first_mcp_server",
"run",
"main.py"
],
"env": {
"DATABRICKS_HOST": "https://<Databricksワークスペースホスト名>",
"DATABRICKS_TOKEN": "<パーソナルアクセストークン>",
"DATABRICKS_GENIE_SPACE_ID": "<GenieスペースID>"
}
}
}
}
ここまで設定したら、CursorでMCPサーバーを設定します。mcp.json
を更新して、設定画面で再読み込みすれば、問題なければ緑ランプが表示されるはずです。
動作確認
Genieから東京の感染者数を取得
といった形で、Genieに問い合わせるようなプロンプトを入力します。
動きました!
Genieスペース側でも確認できます。
一通り動きましたが、やっぱりこちらのドキュメントを読み込もうと思った次第です。