2
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

SupabaseでリアルタイムAIを構築する | 第2章:Supabaseのリアルタイム機能を活用

Posted at

はじめに

第1章では、SupabaseModel Context Protocol(MCP)の基本を学び、Supabaseのprojectsテーブルからデータを取得するシンプルなMCPサーバーを構築しました。これにより、AIがプロジェクト情報を取得し、基本的なデータ連携を体験できました。今回は、Supabaseの強力なリアルタイム機能(WebSocketサブスクリプション)を活用し、データ変更を即座にAIに通知するMCPサーバーを構築します。

この第2章では、Supabaseのサブスクリプションを使って、テーブルに新しいプロジェクトが追加されたときにリアルタイムで通知を受け取り、AIがその情報を処理できるようにします。たとえば、プロジェクトが追加された瞬間にAIが通知を生成したり、分析を開始したりできます。コード例とステップごとのガイドで、リアルタイムAIの可能性を体感しましょう。さあ、始めましょう!

Supabaseのリアルタイム機能:サブスクリプションとは?

Supabaseのリアルタイム機能は、PostgreSQLのリアルタイムリスニングをベースに、WebSocketを通じてデータ変更(挿入、更新、削除)をクライアントに通知します。MCPサーバーにこの機能を統合することで、AIがデータベースの変更に即座に対応可能です。

主な特徴

  • 即時性:データ変更がミリ秒単位で通知。
  • 柔軟性:特定のテーブルや条件(例:特定の列の変更)に絞って監視。
  • スケーラブル:複数のクライアントが同時にサブスクライブ可能。

ユースケース

  • プロジェクト管理:新しいタスクが追加されたら、AIが自動で優先度を提案。
  • コミュニティ:新しいメッセージが投稿されたら、AIが即座に返信。
  • 分析:データ更新時にAIがリアルタイムで統計を生成。

開発環境の準備

第1章の環境を基に、以下の追加準備を行います:

  • supabase-pyライブラリ:リアルタイムサブスクリプションをサポート(既にインストール済み)。
  • Python 3.8以降mcpライブラリClaude Desktop:第1章と同じ。
  • WebSocketサポート:SupabaseのリアルタイムAPIを使用。

Supabaseのリアルタイム設定

  1. リアルタイム有効化
    • Supabaseダッシュボードの「Database」→「Replication」で、projectsテーブルのリアルタイムを有効化。
    • または、SQLエディタで以下を実行:
      ALTER TABLE projects REPLICA IDENTITY FULL;
      
  2. 環境変数:第1章の.envファイル(SUPABASE_URLSUPABASE_KEY)を再利用。
  3. テストデータ:第1章のprojectsテーブルを使用。必要に応じて新しいデータを追加:
    INSERT INTO projects (name, description) VALUES
    ('プロジェクトC', 'リアルタイムテスト');
    

コード例:リアルタイムSupabase用MCPサーバー

以下のMCPサーバーは、Supabaseのprojectsテーブルを監視し、新しいレコードが追加されたときに通知を生成します。さらに、AIが最新データを取得できるリソースを提供します。

from mcp import MCPServer
from supabase import create_client, Client
import os
from dotenv import load_dotenv
import asyncio
import json

class SupabaseRealtimeServer(MCPServer):
    def __init__(self, host, port, supabase_url, supabase_key):
        super().__init__(host, port)
        self.supabase: Client = create_client(supabase_url, supabase_key)
        self.latest_event = None
        self.register_resource("get_latest_event", self.get_latest_event)
        self.register_resource("get_projects", self.get_projects)
        asyncio.create_task(self.start_realtime_subscription())

    async def start_realtime_subscription(self):
        try:
            self.supabase.realtime.connect()
            channel = self.supabase.channel("projects")
            channel.on("INSERT", self.handle_insert).subscribe()
            print("リアルタイムサブスクリプションを開始しました")
        except Exception as e:
            print(f"サブスクリプションエラー: {str(e)}")

    def handle_insert(self, payload):
        self.latest_event = payload
        print(f"新しいプロジェクト追加: {json.dumps(payload, ensure_ascii=False)}")

    def get_latest_event(self, params):
        try:
            if self.latest_event:
                return {"status": "success", "event": self.latest_event}
            return {"status": "success", "event": None, "message": "イベントなし"}
        except Exception as e:
            return {"status": "error", "message": str(e)}

    def get_projects(self, params):
        try:
            response = self.supabase.table("projects").select("*").execute()
            projects = response.data
            return {"status": "success", "projects": projects}
        except Exception as e:
            return {"status": "error", "message": str(e)}

if __name__ == "__main__":
    load_dotenv()
    server = SupabaseRealtimeServer(
        host="localhost",
        port=8095,
        supabase_url=os.getenv("SUPABASE_URL"),
        supabase_key=os.getenv("SUPABASE_KEY")
    )
    print("SupabaseリアルタイムMCPサーバーを起動中: http://localhost:8095")
    server.start()

コードの説明

  • start_realtime_subscription:SupabaseのWebSocketでprojectsテーブルを監視。INSERTイベントに反応。
  • handle_insert:新しいレコードが追加されたとき、イベントを保存し、コンソールにログ出力。
  • get_latest_event:最新のイベントデータをAIに提供。
  • get_projects:第1章と同様、テーブル全体のデータを取得。
  • asyncio.create_task:非同期でリアルタイムサブスクリプションを管理。

前提条件

  • Supabaseプロジェクトでリアルタイムが有効化済み。
  • .envファイルに正しいSUPABASE_URLSUPABASE_KEYが設定済み。

サーバーのテスト

サーバーが正しく動作するか確認します:

  1. サーバー起動

    python supabase_realtime_server.py
    

    コンソールに「SupabaseリアルタイムMCPサーバーを起動中: http://localhost:8095」と表示。

  2. リアルタイムイベントのテスト

    • Supabaseダッシュボードの「Table Editor」またはSQLエディタで新しいプロジェクトを追加:
      INSERT INTO projects (name, description) VALUES
      ('プロジェクトD', 'リアルタイム通知テスト');
      
    • サーバーのコンソールに以下のようなログが表示:
      新しいプロジェクト追加: {"record": {"id": 3, "name": "プロジェクトD", "description": "リアルタイム通知テスト", "created_at": "2025-04-16T00:00:00"}}
      
  3. 最新イベントの取得
    Pythonでリクエストを送信:

    import requests
    import json
    
    url = "http://localhost:8095"
    payload = {
        "jsonrpc": "2.0",
        "method": "get_latest_event",
        "params": {},
        "id": 1
    }
    response = requests.post(url, json=payload)
    print(json.dumps(response.json(), indent=2, ensure_ascii=False))
    

    期待されるレスポンス:

    {
      "jsonrpc": "2.0",
      "result": {
        "status": "success",
        "event": {
          "record": {
            "id": 3,
            "name": "プロジェクトD",
            "description": "リアルタイム通知テスト",
            "created_at": "2025-04-16T00:00:00"
          }
        }
      },
      "id": 1
    }
    

Claude Desktopとの接続

サーバーをClaude Desktopに接続します:

  1. 設定ファイルの編集
    Claude Desktopの設定ファイル(例:claude_desktop_config.json)に以下を追加:

    {
      "mcp_servers": [
        {
          "name": "SupabaseRealtimeServer",
          "url": "http://localhost:8095",
          "auth": "none"
        }
      ]
    }
    
  2. Claudeでテスト
    Claude Desktopを起動し、プロンプトを入力:

    最新のプロジェクト追加イベントを教えてください。
    

    レスポンス例:

    最新のプロジェクト追加:
    - 名前:プロジェクトD
    - 説明:リアルタイム通知テスト
    - 追加日:2025-04-16
    

実装のコツと注意点

  • エラーハンドリング:WebSocket接続が切断された場合、再接続ロジックを追加。
  • スケーラビリティ:大量のイベントを処理する場合、イベントをキューイング(例:Redis)。
  • セキュリティ:本番環境では、APIキーを保護し、HTTPSを有効化。
  • テスト:テスト用テーブルを作成し、本番データに影響を与えないよう注意。
  • 制限:Supabaseの無料枠では同時接続数に制限あり。有料プランを検討。

試してみよう:挑戦課題

以下の機能を追加して、サーバーを強化してみてください:

  • UPDATEイベント(プロジェクト更新)を監視し、変更内容を通知。
  • 最新イベントにフィルタ(例:特定のプロジェクト名)を追加。
  • イベント受信時にSlackやメールで通知するツールを登録。

まとめと次のステップ

この第2章では、SupabaseのリアルタイムサブスクリプションをMCPサーバーに統合し、データ変更を即座にAIに通知する機能を実装しました。これにより、エージェントAIが動的なデータに対応できるようになりました。

次の第3章では、Supabaseのデータを活用してデータ分析エージェントを構築します。たとえば、プロジェクトの進捗や傾向をAIが分析し、提案を生成します。リアルタイムデータ分析に興味がある方は、ぜひお楽しみに!


役に立ったと思ったら、「いいね」や「ストック」をしていただけると嬉しいです!次の章でまたお会いしましょう!

2
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?