はじめに
第1章では、SupabaseとModel Context Protocol(MCP)の基本を学び、Supabaseのprojects
テーブルからデータを取得するシンプルなMCPサーバーを構築しました。これにより、AIがプロジェクト情報を取得し、基本的なデータ連携を体験できました。今回は、Supabaseの強力なリアルタイム機能(WebSocketサブスクリプション)を活用し、データ変更を即座にAIに通知するMCPサーバーを構築します。
この第2章では、Supabaseのサブスクリプションを使って、テーブルに新しいプロジェクトが追加されたときにリアルタイムで通知を受け取り、AIがその情報を処理できるようにします。たとえば、プロジェクトが追加された瞬間にAIが通知を生成したり、分析を開始したりできます。コード例とステップごとのガイドで、リアルタイムAIの可能性を体感しましょう。さあ、始めましょう!
Supabaseのリアルタイム機能:サブスクリプションとは?
Supabaseのリアルタイム機能は、PostgreSQLのリアルタイムリスニングをベースに、WebSocketを通じてデータ変更(挿入、更新、削除)をクライアントに通知します。MCPサーバーにこの機能を統合することで、AIがデータベースの変更に即座に対応可能です。
主な特徴
- 即時性:データ変更がミリ秒単位で通知。
- 柔軟性:特定のテーブルや条件(例:特定の列の変更)に絞って監視。
- スケーラブル:複数のクライアントが同時にサブスクライブ可能。
ユースケース
- プロジェクト管理:新しいタスクが追加されたら、AIが自動で優先度を提案。
- コミュニティ:新しいメッセージが投稿されたら、AIが即座に返信。
- 分析:データ更新時にAIがリアルタイムで統計を生成。
開発環境の準備
第1章の環境を基に、以下の追加準備を行います:
- supabase-pyライブラリ:リアルタイムサブスクリプションをサポート(既にインストール済み)。
- Python 3.8以降、mcpライブラリ、Claude Desktop:第1章と同じ。
- WebSocketサポート:SupabaseのリアルタイムAPIを使用。
Supabaseのリアルタイム設定
-
リアルタイム有効化:
- Supabaseダッシュボードの「Database」→「Replication」で、
projects
テーブルのリアルタイムを有効化。 - または、SQLエディタで以下を実行:
ALTER TABLE projects REPLICA IDENTITY FULL;
- Supabaseダッシュボードの「Database」→「Replication」で、
-
環境変数:第1章の
.env
ファイル(SUPABASE_URL
、SUPABASE_KEY
)を再利用。 -
テストデータ:第1章の
projects
テーブルを使用。必要に応じて新しいデータを追加:INSERT INTO projects (name, description) VALUES ('プロジェクトC', 'リアルタイムテスト');
コード例:リアルタイムSupabase用MCPサーバー
以下のMCPサーバーは、Supabaseのprojects
テーブルを監視し、新しいレコードが追加されたときに通知を生成します。さらに、AIが最新データを取得できるリソースを提供します。
from mcp import MCPServer
from supabase import create_client, Client
import os
from dotenv import load_dotenv
import asyncio
import json
class SupabaseRealtimeServer(MCPServer):
def __init__(self, host, port, supabase_url, supabase_key):
super().__init__(host, port)
self.supabase: Client = create_client(supabase_url, supabase_key)
self.latest_event = None
self.register_resource("get_latest_event", self.get_latest_event)
self.register_resource("get_projects", self.get_projects)
asyncio.create_task(self.start_realtime_subscription())
async def start_realtime_subscription(self):
try:
self.supabase.realtime.connect()
channel = self.supabase.channel("projects")
channel.on("INSERT", self.handle_insert).subscribe()
print("リアルタイムサブスクリプションを開始しました")
except Exception as e:
print(f"サブスクリプションエラー: {str(e)}")
def handle_insert(self, payload):
self.latest_event = payload
print(f"新しいプロジェクト追加: {json.dumps(payload, ensure_ascii=False)}")
def get_latest_event(self, params):
try:
if self.latest_event:
return {"status": "success", "event": self.latest_event}
return {"status": "success", "event": None, "message": "イベントなし"}
except Exception as e:
return {"status": "error", "message": str(e)}
def get_projects(self, params):
try:
response = self.supabase.table("projects").select("*").execute()
projects = response.data
return {"status": "success", "projects": projects}
except Exception as e:
return {"status": "error", "message": str(e)}
if __name__ == "__main__":
load_dotenv()
server = SupabaseRealtimeServer(
host="localhost",
port=8095,
supabase_url=os.getenv("SUPABASE_URL"),
supabase_key=os.getenv("SUPABASE_KEY")
)
print("SupabaseリアルタイムMCPサーバーを起動中: http://localhost:8095")
server.start()
コードの説明
-
start_realtime_subscription:SupabaseのWebSocketで
projects
テーブルを監視。INSERT
イベントに反応。 - handle_insert:新しいレコードが追加されたとき、イベントを保存し、コンソールにログ出力。
- get_latest_event:最新のイベントデータをAIに提供。
- get_projects:第1章と同様、テーブル全体のデータを取得。
- asyncio.create_task:非同期でリアルタイムサブスクリプションを管理。
前提条件
- Supabaseプロジェクトでリアルタイムが有効化済み。
-
.env
ファイルに正しいSUPABASE_URL
とSUPABASE_KEY
が設定済み。
サーバーのテスト
サーバーが正しく動作するか確認します:
-
サーバー起動:
python supabase_realtime_server.py
コンソールに「SupabaseリアルタイムMCPサーバーを起動中: http://localhost:8095」と表示。
-
リアルタイムイベントのテスト:
- Supabaseダッシュボードの「Table Editor」またはSQLエディタで新しいプロジェクトを追加:
INSERT INTO projects (name, description) VALUES ('プロジェクトD', 'リアルタイム通知テスト');
- サーバーのコンソールに以下のようなログが表示:
新しいプロジェクト追加: {"record": {"id": 3, "name": "プロジェクトD", "description": "リアルタイム通知テスト", "created_at": "2025-04-16T00:00:00"}}
- Supabaseダッシュボードの「Table Editor」またはSQLエディタで新しいプロジェクトを追加:
-
最新イベントの取得:
Pythonでリクエストを送信:import requests import json url = "http://localhost:8095" payload = { "jsonrpc": "2.0", "method": "get_latest_event", "params": {}, "id": 1 } response = requests.post(url, json=payload) print(json.dumps(response.json(), indent=2, ensure_ascii=False))
期待されるレスポンス:
{ "jsonrpc": "2.0", "result": { "status": "success", "event": { "record": { "id": 3, "name": "プロジェクトD", "description": "リアルタイム通知テスト", "created_at": "2025-04-16T00:00:00" } } }, "id": 1 }
Claude Desktopとの接続
サーバーをClaude Desktopに接続します:
-
設定ファイルの編集:
Claude Desktopの設定ファイル(例:claude_desktop_config.json
)に以下を追加:{ "mcp_servers": [ { "name": "SupabaseRealtimeServer", "url": "http://localhost:8095", "auth": "none" } ] }
-
Claudeでテスト:
Claude Desktopを起動し、プロンプトを入力:最新のプロジェクト追加イベントを教えてください。
レスポンス例:
最新のプロジェクト追加: - 名前:プロジェクトD - 説明:リアルタイム通知テスト - 追加日:2025-04-16
実装のコツと注意点
- エラーハンドリング:WebSocket接続が切断された場合、再接続ロジックを追加。
- スケーラビリティ:大量のイベントを処理する場合、イベントをキューイング(例:Redis)。
- セキュリティ:本番環境では、APIキーを保護し、HTTPSを有効化。
- テスト:テスト用テーブルを作成し、本番データに影響を与えないよう注意。
- 制限:Supabaseの無料枠では同時接続数に制限あり。有料プランを検討。
試してみよう:挑戦課題
以下の機能を追加して、サーバーを強化してみてください:
-
UPDATE
イベント(プロジェクト更新)を監視し、変更内容を通知。 - 最新イベントにフィルタ(例:特定のプロジェクト名)を追加。
- イベント受信時にSlackやメールで通知するツールを登録。
まとめと次のステップ
この第2章では、SupabaseのリアルタイムサブスクリプションをMCPサーバーに統合し、データ変更を即座にAIに通知する機能を実装しました。これにより、エージェントAIが動的なデータに対応できるようになりました。
次の第3章では、Supabaseのデータを活用してデータ分析エージェントを構築します。たとえば、プロジェクトの進捗や傾向をAIが分析し、提案を生成します。リアルタイムデータ分析に興味がある方は、ぜひお楽しみに!
役に立ったと思ったら、「いいね」や「ストック」をしていただけると嬉しいです!次の章でまたお会いしましょう!