3
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Genie Conversation APIを活用したDatabricks MCPサーバーを作ってみる

Last updated at Posted at 2025-04-08

こちらの発展系です。

さらに理解を深めるには自分で作るしかないと。クラスター一覧の取得やSQL実行を行う既出なので、(おそらく)まだ誰も作っていないGenie Conversation APIを使ったMCPサーバーを作ってみます。

作る際にはAPIリファレンスが必須です。

作ったと言いましても、多くをこちらから流用させていただきました。ありがとうございます。

事前準備

Genie Conversation APIを使うにはアクセス先のGenieスペースが必要なので、作成および設定しておきます。テーブルや指示を設定しておきます。

Screenshot 2025-04-08 at 21.48.44.png
Screenshot 2025-04-08 at 21.48.52.png

以降でMCPサーバーを実装します。maim.pyおよび依存するファイルdbapi.pyatabricks_formatter.pyを実装し、設定ファイルを作成します。

MCPサーバーの実装

maim.py

maim.py
"""
Databricks Genie MCP Server
MCPを使用してGenieとの対話を可能にするサーバー
"""
from mcp.server.fastmcp import FastMCP
from dbapi import genie_conversation
from databricks_formatter import format_query_results

# MCPサーバーを初期化
# サーバー名を指定して新しいインスタンスを作成
mcp = FastMCP("Databricks Genie MCP Server")

@mcp.tool()
async def start_conversation(content: str) -> str:
    """
    Genie APIとの会話を開始する
    
    Args:
        content (str): Genieに送信するメッセージ内容
    
    Returns:
        str: Genieからのレスポンスを整形した結果
    """
    try:
        # Genie APIを呼び出して応答を取得
        result = await genie_conversation(content)
        # 応答結果を見やすい形式に整形
        return format_query_results(result)
    except Exception as e:
        # エラーが発生した場合はエラーメッセージを返す
        return f"Error talking to Genie: {str(e)}"

# メインエントリーポイント
if __name__ == "__main__":
    # デフォルトでstdioトランスポートを使用してサーバーを起動
    # これによりコマンドラインからの対話が可能になる
    mcp.run()

dbapi.py

Genie APIを呼び出すように変更しています。Genieの会話が完了するとStatement Executionの結果IDを取得できるので、それを指定して結果セットを取り出す流れとなります。

dbapi.py
"""
Databricks API通信モジュール
Genieとの対話に必要なAPI呼び出しを管理します
"""

from typing import Any, Dict, Optional
import os
import asyncio
import httpx
from dotenv import load_dotenv

# 環境変数を.envファイルから読み込み
load_dotenv()

# Databricks接続に必要な設定値
DATABRICKS_HOST = os.environ.get("DATABRICKS_HOST", "")
DATABRICKS_TOKEN = os.environ.get("DATABRICKS_TOKEN", "")
DATABRICKS_GENIE_SPACE_ID = os.environ.get("DATABRICKS_GENIE_SPACE_ID", "")

# APIエンドポイントの定義
GENIE_START_CONVERSATION_API = "/api/2.0/genie/spaces/{space_id}/start-conversation"  # 会話開始用エンドポイント
GENIE_GET_MESSAGE_API = "/api/2.0/genie/spaces/{space_id}/conversations/{conversation_id}/messages/{message_id}"  # メッセージ取得用エンドポイント
STATEMENT_API = "/api/2.0/sql/statements/{statement_id}"  # SQLステートメント実行結果取得用エンドポイント

async def make_databricks_request(
    method: str,
    endpoint: str,
    json_data: Optional[Dict[str, Any]] = None,
    params: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
    """
    DatabricksのAPIにリクエストを送信する汎用関数
    
    Args:
        method (str): HTTPメソッド('get'または'post')
        endpoint (str): APIエンドポイントのパス
        json_data (Optional[Dict[str, Any]]): POSTリクエスト用のJSONデータ
        params (Optional[Dict[str, Any]]): GETリクエスト用のクエリパラメータ
    
    Returns:
        Dict[str, Any]: APIレスポンスのJSONデータ
    """
    url = f"{DATABRICKS_HOST}{endpoint}"
    print(url)

    # 認証ヘッダーの設定
    headers = {
        "Authorization": f"Bearer {DATABRICKS_TOKEN}",
        "Content-Type": "application/json"
    }
    
    async with httpx.AsyncClient() as client:
        try:
            # HTTPメソッドに応じてリクエストを実行
            if method.lower() == "get":
                response = await client.get(url, headers=headers, params=params, timeout=30.0)
            elif method.lower() == "post":
                response = await client.post(url, headers=headers, json=json_data, timeout=30.0)
            else:
                raise ValueError(f"サポートされていないHTTPメソッド: {method}")
            
            response.raise_for_status()
            return response.json()
        except httpx.HTTPStatusError as e:
            # HTTPエラーの詳細情報を取得して例外を発生
            error_message = f"HTTPエラー: {e.response.status_code}"
            try:
                error_detail = e.response.json()
                error_message += f" - {error_detail.get('message', '')}"
            except Exception:
                pass
            raise Exception(error_message)
        except Exception as e:
            raise Exception(f"Databricks APIリクエスト中にエラーが発生: {str(e)}")


async def genie_conversation(content: str, space_id: Optional[str] = None) -> Dict[str, Any]:
    """
    Genieとの会話を実行し、結果が得られるまで待機する
    
    Args:
        content (str): Genieに送信するメッセージ内容
        space_id (Optional[str]): Genie Space ID(未指定時は環境変数から取得)
    
    Returns:
        Dict[str, Any]: 会話の実行結果
    """
    if not space_id:
        space_id = DATABRICKS_GENIE_SPACE_ID
    
    if not space_id:
        raise ValueError("Genie Space IDが必要です。環境変数DATABRICKS_GENIE_SPACE_IDを設定するか、パラメータとして指定してください。")
    
    # 会話開始リクエストの作成
    statement_data = {
        "content": content
    }
    
    # 会話を開始し、conversation_idとmessage_idを取得
    endpoint_url = GENIE_START_CONVERSATION_API.format(space_id=space_id)
    response = await make_databricks_request("post", endpoint_url, json_data=statement_data)
    message = response.get("message")
    conversation_id = message["conversation_id"]
    message_id = message["id"]

    if not conversation_id:
        raise Exception("レスポンスからconversation_IDの取得に失敗しました")
    
    # 会話生成完了をポーリング
    max_retries = 60  # 最大リトライ回数(10秒間隔で10分)
    retry_count = 0

    while retry_count < max_retries:
        # メッセージのステータスを確認
        message_status = await make_databricks_request(
            "get", 
            GENIE_GET_MESSAGE_API.format(space_id=space_id, conversation_id=conversation_id, message_id=message_id)
        )

        status = message_status["status"]
        
        if status == "COMPLETED":
            # 完了した場合、クエリ結果を取得
            query_result_statement_id = message_status["attachments"][0]["query"]["statement_id"]

            statement_status = await make_databricks_request(
                "get", 
                STATEMENT_API.format(statement_id=query_result_statement_id)
            )
            
            return statement_status
        elif status in ["FAILED", "CANCELED"]:
            error_message = message_status["status"]
            raise Exception(f"メッセージの取得に失敗: {error_message}")
        
        # 次のポーリングまで待機
        await asyncio.sleep(10)
        retry_count += 1
    
    return message["content"]

databricks_formatter.py

これはオリジナルのままです。結果セットを整形します。

from typing import Any, Dict


def format_query_results(result: Dict[str, Any]) -> str:
    """Format query results into a readable string."""

    # Check if result is empty or doesn't have the expected structure
    if not result or 'manifest' not in result or 'result' not in result:
        return "No results or invalid result format."
    
    # Extract column names from the manifest
    column_names = []
    if 'manifest' in result and 'schema' in result['manifest'] and 'columns' in result['manifest']['schema']:
        columns = result['manifest']['schema']['columns']
        column_names = [col['name'] for col in columns] if columns else []
    
    # If no column names were found, return early
    if not column_names:
        return "No columns found in the result."
    
    # Extract rows from the result
    rows = []
    if 'result' in result and 'data_array' in result['result']:
        rows = result['result']['data_array']
    
    # If no rows were found, return just the column headers
    if not rows:
        # Format as a table
        output = []
        
        # Add header
        output.append(" | ".join(column_names))
        output.append("-" * (sum(len(name) + 3 for name in column_names) - 1))
        output.append("No data rows found.")
        
        return "\n".join(output)
    
    # Format as a table
    output = []
    
    # Add header
    output.append(" | ".join(column_names))
    output.append("-" * (sum(len(name) + 3 for name in column_names) - 1))
    
    # Add rows
    for row in rows:
        row_values = []
        for value in row:
            if value is None:
                row_values.append("NULL")
            else:
                row_values.append(str(value))
        output.append(" | ".join(row_values))
    
    return "\n".join(output) 

pyproject.toml

依存ライブラリを記述します。

pyproject.toml
[project]
name = "databricks_test"
version = "0.1.0"
description = "Databricks Genie Integration"
readme = "README.md"
requires-python = ">=3.10"
dependencies = [
    "httpx>=0.28.1",
    "mcp[cli]>=1.3.0",
]

mcp.json

mcp.json
{
    "mcpServers": {
        "databricks_test": {
            "command": "uv",
            "args": [
                "--directory",
                "/Users/takaaki.yayoi/cursor/my_first_mcp_server",
                "run",
                "main.py"
            ],
            "env": {
                "DATABRICKS_HOST": "https://<Databricksワークスペースホスト名>",
                "DATABRICKS_TOKEN": "<パーソナルアクセストークン>",
                "DATABRICKS_GENIE_SPACE_ID": "<GenieスペースID>"
            }
        }
    }
}

ここまで設定したら、CursorでMCPサーバーを設定します。mcp.jsonを更新して、設定画面で再読み込みすれば、問題なければ緑ランプが表示されるはずです。

Screenshot 2025-04-08 at 21.51.31.png

注意
MCPサーバーを開発している過程で、変更点をCursorに反映させるには設定画面で更新ボタンを押しましょう。
Screenshot 2025-04-08 at 21.51.31.png

動作確認

Genieから東京の感染者数を取得といった形で、Genieに問い合わせるようなプロンプトを入力します。

動きました!

Screenshot 2025-04-08 at 21.53.34.png

Genieスペース側でも確認できます。

Screenshot 2025-04-08 at 21.54.45.png

一通り動きましたが、やっぱりこちらのドキュメントを読み込もうと思った次第です。

はじめてのDatabricks

はじめてのDatabricks

Databricks無料トライアル

Databricks無料トライアル

3
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?