1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

NotionでAIを強化する | 第5章:最適化とコミュニティへの貢献

Posted at

はじめに

これまでの第1章から第4章では、NotionModel Context Protocol(MCP)を活用してAIエージェントを構築しました。ページデータの取得、タスク自動化、データベース分析、リアルタイム管理を通じて、NotionのAPIとMCPの柔軟性を最大限に引き出しました。この最終章では、MCPサーバーの最適化セキュリティ強化、そしてコミュニティへの貢献に焦点を当てます。

この第5章では、Notion用MCPサーバーのパフォーマンスを向上させ、安全性を確保する方法を解説します。さらに、サーバーをオープンソースとして共有し、MCPコミュニティに貢献する方法を探ります。コード例を通じて、キャッシュとセキュリティログの実装も学びます。NotionとMCPの未来を一緒に切り開きましょう!

MCPサーバーの最適化

Notion用MCPサーバーを本番環境で運用するには、以下の最適化が必要です:

1. キャッシュの活用

  • 目的:頻繁なデータ取得(例:データベースエントリや更新データ)を高速化。
  • 方法:Redisなどのインメモリキャッシュを使用。
  • :データベースエントリを5分間キャッシュし、Notion APIへのリクエストを削減。

2. レートリミティング

  • 目的:Notion APIの制限(通常3リクエスト/秒)を遵守し、DoS攻撃を防止。
  • 方法:リクエストごとに制限を設定(例:1分間に50リクエスト)。
  • ratelimitライブラリを使用して制限を実装。

3. 非同期処理

  • 目的:Webhookシミュレーションや大量のリクエストを効率的に処理。
  • 方法asyncioやメッセージキュー(例:RabbitMQ)を活用。
  • :データベース更新チェックをキューイングし、レスポンス時間を短縮。

セキュリティ強化

リアルタイムAIを安全に運用するには、以下のセキュリティ対策が重要です:

1. HTTPSの有効化

  • 目的:通信データを暗号化し、盗聴を防止。
  • 方法:Let’s EncryptやクラウドプロバイダーのSSL証明書を使用。
  • 推奨事項:本番環境ではTLS 1.3を採用。

2. APIトークン認証

  • 目的:NotionやMCPサーバーへのアクセスを制限。
  • 方法:NotionのAPIトークンを安全に管理し、Webhookリクエストに認証を追加。
  • :リクエストごとにトークンを検証。

3. セキュリティログ

  • 目的:不正アクセスやエラーを追跡。
  • 方法:リクエストとレスポンスをログに記録。

コード例:最適化とセキュリティログの実装

以下のコードは、第4章のリアルタイム管理サーバーにキャッシュ(Redis)とセキュリティログを追加した例です:

from mcp import MCPServer
import os
from dotenv import load_dotenv
import requests
from http.server import BaseHTTPRequestHandler, HTTPServer
import json
import redis
import logging
from datetime import datetime
import threading

# ログ設定
logging.basicConfig(
    filename="notion_realtime_server.log",
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s"
)

class OptimizedNotionRealtimeServer(MCPServer):
    def __init__(self, host, port, token, database_id, redis_host, redis_port):
        super().__init__(host, port)
        self.token = token
        self.database_id = database_id
        self.base_url = "https://api.notion.com/v1"
        self.headers = {
            "Authorization": f"Bearer {token}",
            "Notion-Version": "2022-06-28",
            "Content-Type": "application/json"
        }
        self.redis = redis.Redis(host=redis_host, port=redis_port, decode_responses=True)
        self.logger = logging.getLogger(__name__)
        self.latest_update = None
        self.last_checked = None
        self.register_resource("get_latest_update", self.get_latest_update)
        self.register_tool("add_comment", self.add_comment)
        self.start_webhook_server()

    def check_for_updates(self):
        try:
            cache_key = f"notion_db:{self.database_id}"
            cached = self.redis.get(cache_key)
            if cached:
                self.logger.info(f"キャッシュヒット: キー={cache_key}")
                return json.loads(cached)

            url = f"{self.base_url}/databases/{self.database_id}/query"
            response = requests.post(url, headers=self.headers, json={})
            response.raise_for_status()
            entries = response.json()["results"]
            for entry in entries:
                last_edited = entry["last_edited_time"]
                if not self.last_checked or last_edited > self.last_checked:
                    self.latest_update = {
                        "action": {
                            "data": {
                                "entry": {
                                    "id": entry["id"],
                                    "name": entry["properties"].get("Name", {}).get("title", [{}])[0].get("plain_text", ""),
                                    "status": entry["properties"].get("Status", {}).get("select", {}).get("name", "")
                                }
                            },
                            "type": "updateEntry",
                            "date": last_edited
                        }
                    }
                    self.last_checked = last_edited
            self.redis.setex(cache_key, 300, json.dumps({"status": "success", "entries": entries}))
            return {"status": "success", "entries": entries}
        except Exception as e:
            self.logger.error(f"更新チェック失敗: エラー={str(e)}")
            return {"status": "error", "message": str(e)}

    def get_latest_update(self, params):
        request_id = datetime.now().isoformat()
        self.logger.info(f"リクエスト受信 [ID: {request_id}]: パラメータ={params}")
        try:
            self.check_for_updates()
            if self.latest_update:
                action = self.latest_update["action"]
                update_info = {
                    "entry_id": action["data"]["entry"]["id"],
                    "entry_name": action["data"]["entry"]["name"],
                    "action_type": action["type"],
                    "status": action["data"]["entry"]["status"],
                    "date": action["date"]
                }
                self.logger.info(f"リクエスト成功 [ID: {request_id}]: エントリ={update_info['entry_id']}")
                return {"status": "success", "update_info": update_info}
            self.logger.info(f"リクエスト成功 [ID: {request_id}]: 更新なし")
            return {"status": "success", "update_info": None, "message": "更新なし"}
        except Exception as e:
            self.logger.error(f"リクエスト失敗 [ID: {request_id}]: エラー={str(e)}")
            return {"status": "error", "message": str(e)}

    def add_comment(self, params):
        request_id = datetime.now().isoformat()
        self.logger.info(f"リクエスト受信 [ID: {request_id}]: パラメータ={params}")
        try:
            entry_id = params.get("entry_id", "")
            message = params.get("message", "")
            if not entry_id or not message:
                self.logger.warning(f"リクエスト失敗 [ID: {request_id}]: エントリIDとコメントが必要です")
                return {"status": "error", "message": "エントリIDとコメントが必要です"}
            
            url = f"{self.base_url}/comments"
            payload = {
                "parent": {"page_id": entry_id},
                "rich_text": [{"text": {"content": message}}]
            }
            response = requests.post(url, headers=self.headers, json=payload)
            response.raise_for_status()
            comment = response.json()
            self.logger.info(f"リクエスト成功 [ID: {request_id}]: コメントID={comment['id']}")
            return {"status": "success", "comment_id": comment["id"]}
        except Exception as e:
            self.logger.error(f"リクエスト失敗 [ID: {request_id}]: エラー={str(e)}")
            return {"status": "error", "message": str(e)}

    def start_webhook_server(self):
        class WebhookHandler(BaseHTTPRequestHandler):
            def do_POST(self):
                content_length = int(self.headers["Content-Length"])
                post_data = self.rfile.read(content_length)
                self.server.parent.latest_update = json.loads(post_data.decode("utf-8"))
                self.server.parent.logger.info(f"Webhook受信: データ={self.server.parent.latest_update}")
                self.send_response(200)
                self.end_headers()
                self.wfile.write(b"Webhook received")

        server = HTTPServer(("localhost", 8121), WebhookHandler)
        server.parent = self
        threading.Thread(target=server.serve_forever, daemon=True).start()
        print("Webhookサーバーを起動中: http://localhost:8121")

if __name__ == "__main__":
    load_dotenv()
    server = OptimizedNotionRealtimeServer(
        host="localhost",
        port=8121,
        token=os.getenv("NOTION_TOKEN"),
        database_id=os.getenv("NOTION_DATABASE_ID"),
        redis_host=os.getenv("REDIS_HOST", "localhost"),
        redis_port=int(os.getenv("REDIS_PORT", 6379))
    )
    print("最適化NotionリアルタイムMCPサーバーを起動中: http://localhost:8121")
    server.start()

コードの説明

  • Redisキャッシュ:データベースエントリをキャッシュ(5分間有効)。setexで有効期限を設定。
  • セキュリティログ:リクエスト、Webhook受信、成功、失敗をnotion_realtime_server.logに記録。
  • check_for_updates:データベース更新をチェックし、キャッシュを利用。
  • get_latest_update:キャッシュまたは最新のデータから更新情報を取得。
  • add_comment:エントリにコメントを追加し、ログを記録。

前提条件

  • Redisサーバーがローカルまたはクラウドで稼働(例:docker run -p 6379:6379 redis)。
  • .envファイルにNOTION_TOKENNOTION_DATABASE_IDREDIS_HOSTREDIS_PORTが設定済み。
  • Notionデータベースにエントリが存在。
  • APIトークンに読み書きおよびコメント権限がある。

サーバーのテスト

サーバーが正しく動作するか確認します:

  1. Redis起動

    docker run -p 6379:6379 redis
    
  2. ngrok起動

    ngrok http 8121
    

    ngrok URL(例:https://abc123.ngrok.io)を記録。

  3. サーバー起動

    python optimized_notion_realtime_server.py
    

    コンソールに「最適化NotionリアルタイムMCPサーバーを起動中: http://localhost:8121」と「Webhookサーバーを起動中: http://localhost:8121」が表示。

  4. 最新更新取得のテスト
    Pythonでリクエストを送信:

    import requests
    import json
    
    url = "http://localhost:8121"
    payload = {
        "jsonrpc": "2.0",
        "method": "get_latest_update",
        "params": {},
        "id": 1
    }
    response = requests.post(url, json=payload)
    print(json.dumps(response.json(), indent=2, ensure_ascii=False))
    

    期待されるレスポンス:

    {
      "jsonrpc": "2.0",
      "result": {
        "status": "success",
        "update_info": {
          "entry_id": "entry123",
          "entry_name": "コードレビュー",
          "action_type": "updateEntry",
          "status": "In Progress",
          "date": "2025-04-22T10:00:00.000Z"
        }
      },
      "id": 1
    }
    
  5. ログ確認
    notion_realtime_server.logに以下のような記録が残る:

    2025-04-22 23:30:00,123 - INFO - リクエスト受信 [ID: 2025-04-22T23:30:00.123456]: パラメータ={}
    2025-04-22 23:30:00,125 - INFO - キャッシュヒット: キー=notion_db:db123
    

コミュニティへの貢献

Notion用MCPサーバーをオープンソースとして共有することで、コミュニティに貢献できます。以下のステップで進めます:

1. GitHubでの公開

  • リポジトリ作成:GitHubに新しいリポジトリを作成(例:notion-mcp-server)。

  • コード整理:モジュール化し、再利用可能な構造にする。

  • README:インストール手順、使い方、例を記載。

    # Notion MCP Server
    NotionとMCPを統合し、AIエージェントを構築するサーバーです。
    
    ## インストール
    ```bash
    pip install mcp requests redis
    

    使い方

    1. .envNOTION_TOKENNOTION_DATABASE_IDを設定。
    2. python server.pyで起動。
  • ライセンス:MITやApache 2.0など、オープンソースライセンスを選択。

2. ドキュメントの提供

  • Qiita記事:このシリーズのようなチュートリアルを共有。
  • Notionコミュニティ:NotionのフォーラムやRedditでプロジェクトを紹介。
  • MCPコミュニティ:MCPのGitHub IssuesやDiscordでサーバーを提案。

3. フィードバックの収集

  • Issueトラッキング:バグ報告や機能リクエストを受け付ける。
  • プルリクエスト:他の開発者からの貢献を歓迎。
  • 改善の継続:コミュニティのフィードバックを基にサーバーを更新。

NotionとMCPの未来

NotionとMCPの組み合わせは、AIをプロジェクト管理やノート管理の強力なツールに変える可能性を秘めています。以下は、長期的なビジョンです:

1. ネイティブ統合

  • ビジョン:NotionがMCPをネイティブサポートし、データベース設定からMCPサーバーを直接接続。
  • :NotionのインテグレーションにMCPエージェントを追加。

2. エンタープライズ採用

  • ビジョン:企業がNotionとMCPを使って、プロジェクト管理やデータ分析をスケール。
  • :AIが複数データベースのデータを統合し、企業全体の進捗を分析。

3. パーソナライズドAI

  • ビジョン:個人がNotionにプライベートデータベースを作成し、MCP経由でカスタムAIを利用。
  • :個人タスクをAIが優先順位付けし、最適なスケジュールを提案。

シリーズのまとめ

このシリーズを通じて、NotionとMCPを活用したAIエージェントの構築を以下のように学びました:

  • 第1章:NotionとMCPの基本、ページデータ取得。
  • 第2章:タスク自動化でページやデータベースエントリを効率化。
  • 第3章:データ分析エージェントで進捗やボトルネックを評価。
  • 第4章:リアルタイム管理AIでデータベース更新を動的監視。
  • 第5章:サーバーの最適化、セキュリティ、コミュニティ貢献。

Notionの柔軟なデータベース管理とMCPの接続性は、AIをノート管理やプロジェクト管理の強力なアシスタントに変えます。あなたもこのサーバーを試し、コミュニティで共有して、AIエージェントの未来を共創しませんか?


役に立ったと思ったら、「いいね」や「ストック」をしていただけると嬉しいです!次の挑戦でまたお会いしましょう!

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?