1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

ZapierでAIを強化する | 第4章:リアルタイム管理AIを強化:Webhook連携

Posted at

はじめに

第3章では、ZapierのZapsと実行履歴を活用してワークフロー分析エージェントを構築しました。実行頻度やエラーレートを分析することで、AIがワークフローのインサイトを提供し、プロセス改善を支援できるようになりました。今回は、ZapierのWebhook機能を利用して、Zapsの実行やエラーイベントをリアルタイムで検知するリアルタイム管理AIを構築します。

この第4章では、MCPサーバーにZapierのWebhookを統合し、Zapsの実行、成功、失敗などのイベントを即座に捕捉します。AIはこれを利用して、自動で通知を送信したり、エラー発生時に代替アクションを提案したりできます。コード例とステップごとのガイドで、リアルタイム管理AIの構築を体験しましょう。さあ、始めましょう!

リアルタイム管理AIとは?

リアルタイム管理AIは、Zapierのイベント(Zaps実行、成功、失敗、トリガー発火など)を監視し、ワークフロー管理を動的に支援するエージェントです。MCPサーバーとZapierのWebhookを組み合わせることで、以下のような機能を実現できます:

  • イベント検知:Zapsの実行やエラーをリアルタイムで捕捉。
  • 自動応答:エラーイベントに基づいて通知や代替Zapsを提案。
  • 外部連携:イベントをSlackやメールに通知し、コラボレーションを強化。

ユースケース

  • ワークフロー監視:AIがZapsの失敗を検知し、Slackに通知。
  • エラー対応:AIが失敗したZapsを一時停止し、代替ワークフローを提案。
  • プロセス最適化:AIが頻繁に実行されるZapsを検出し、効率化を提案。

開発環境の準備

第3章の環境を基に、以下の追加準備を行います:

  • Python 3.8以降mcpライブラリrequestsライブラリClaude Desktop:これまでと同じ。
  • python-dotenv:環境変数の管理(既にインストール済み)。
  • ngrok:ローカルサーバーを公開し、Webhookを受信。
  • Zapierアカウント:Webhook対応の設定。

インストールコマンド(必要に応じて):

pip install requests python-dotenv

Zapierのセットアップ

  1. Zapier APIトークンの確認
    • 第3章のトークンを使用。
    • 権限を確認:Zaps操作(読み書き)、Webhook管理。
  2. Webhookの設定
    • ZapierのDeveloper PlatformまたはZaps内でWebhookアプリを使用。
    • 新しいZapsを作成し、Webhookトリガーを設定:
      • トリガー:Zapierの「Catch Hook」(カスタムWebhook)。
      • アクション:テスト用(例:Slackに通知、Google Sheetsに記録)。
    • Webhook URLを後で取得(ngrok経由)。
  3. Zapierワークフロー準備
    • テスト用Zapsを追加(例:Gmail→Trello、Slack→Google Sheets)。
    • Zapsを実行し、成功・失敗イベントを生成(例:無効な接続で失敗)。
  4. ngrokの設定
    • ngrokをインストール(brew install ngrokまたは公式サイト)。
    • ローカルサーバーを公開:
      ngrok http 8140
      
    • 生成されたURLを記録(例:https://abc123.ngrok.io)。
    • ZapierのWebhook設定にURLを設定(例:https://abc123.ngrok.io/webhook)。
  5. 環境変数
    第3章の.envファイルに以下を追加:
    ZAPIER_API_TOKEN=your_zapier_api_token
    ZAPIER_WEBHOOK_SECRET=mysecret
    

コード例:リアルタイム管理用MCPサーバー

以下のMCPサーバーは、ZapierのWebhookを介してイベントを検知し、通知やZaps操作の機能を提供します。

from mcp import MCPServer
import os
from dotenv import load_dotenv
import requests
from http.server import BaseHTTPRequestHandler, HTTPServer
import json
import hmac
import hashlib
import threading

class ZapierRealtimeServer(MCPServer):
    def __init__(self, host, port, api_token, webhook_secret):
        super().__init__(host, port)
        self.api_token = api_token
        self.webhook_secret = webhook_secret
        self.base_url = "https://api.zapier.com/v1"
        self.headers = {
            "Authorization": f"Bearer {api_token}",
            "Accept": "application/json",
            "Content-Type": "application/json"
        }
        self.latest_event = None
        self.register_resource("get_latest_event", self.get_latest_event)
        self.register_tool("send_notification", self.send_notification)
        self.start_webhook_server()

    def verify_webhook_signature(self, body, signature):
        try:
            computed_sig = "sha256=" + hmac.new(
                self.webhook_secret.encode("utf-8"),
                body.encode("utf-8"),
                hashlib.sha256
            ).hexdigest()
            return hmac.compare_digest(computed_sig, signature)
        except Exception as e:
            return False

    def send_notification(self, params):
        try:
            zap_id = params.get("zap_id", "")
            message = params.get("message", "")
            if not zap_id or not message:
                return {"status": "error", "message": "Zaps IDとメッセージが必要です"}
            
            # 例:Slackに通知(実際のアクションはZapier Zapsで設定)
            url = f"{self.base_url}/zaps/{zap_id}/trigger"
            payload = {"message": message}
            response = requests.post(url, headers=self.headers, json=payload)
            response.raise_for_status()
            return {"status": "success", "notification_sent": True}
        except Exception as e:
            return {"status": "error", "message": str(e)}

    def get_latest_event(self, params):
        try:
            if self.latest_event:
                event_info = {
                    "event_type": self.latest_event.get("event_type", ""),
                    "zap_id": self.latest_event.get("zap_id", ""),
                    "status": self.latest_event.get("status", ""),
                    "timestamp": self.latest_event.get("timestamp", ""),
                    "details": self.latest_event.get("details", {})
                }
                return {"status": "success", "event_info": event_info}
            return {"status": "success", "event_info": None, "message": "イベントなし"}
        except Exception as e:
            return {"status": "error", "message": str(e)}

    def start_webhook_server(self):
        class WebhookHandler(BaseHTTPRequestHandler):
            def do_POST(self):
                content_length = int(self.headers["Content-Length"])
                post_data = self.rfile.read(content_length)
                signature = self.headers.get("X-Zapier-Signature", "")

                # Webhook署名検証
                if not self.server.parent.verify_webhook_signature(post_data.decode("utf-8"), signature):
                    self.send_response(401)
                    self.end_headers()
                    self.wfile.write(b"Invalid signature")
                    return

                data = json.loads(post_data.decode("utf-8"))
                event_type = data.get("event_type", "unknown")
                zap_id = data.get("zap_id", "")

                # イベント処理
                if event_type in ["zap_execution", "zap_error"]:
                    self.server.parent.latest_event = {
                        "event_type": event_type,
                        "zap_id": zap_id,
                        "status": data.get("status", ""),
                        "timestamp": data.get("timestamp", ""),
                        "details": data.get("details", {})
                    }

                self.send_response(200)
                self.end_headers()
                self.wfile.write(b"Webhook received")

        server = HTTPServer(("localhost", 8140), WebhookHandler)
        server.parent = self
        threading.Thread(target=server.serve_forever, daemon=True).start()
        print("Webhookサーバーを起動中: http://localhost:8140")

if __name__ == "__main__":
    load_dotenv()
    server = ZapierRealtimeServer(
        host="localhost",
        port=8140,
        api_token=os.getenv("ZAPIER_API_TOKEN"),
        webhook_secret=os.getenv("ZAPIER_WEBHOOK_SECRET")
    )
    print("ZapierリアルタイムMCPサーバーを起動中: http://localhost:8140")
    server.start()

コードの説明

  • verify_webhook_signature:ZapierのWebhook署名(X-Zapier-Signature)をHMAC-SHA256で検証。
  • send_notification:指定したZapsに通知を送信(例:SlackやGoogle Sheetsにメッセージ)。
  • get_latest_event:Webhookで受信した最新イベント(Zaps実行、失敗など)を取得。
  • start_webhook_server:ローカルWebhookサーバーを起動し、Zapierからのイベントを処理。
  • WebhookHandler:Zapierのイベント(zap_execution、zap_error)を受信し、署名検証を行う。

前提条件

  • ZapierにWebhookトリガーが設定済み(例:Catch Hookでイベント捕捉)。
  • ngrokでWebhook URLが公開され、Zapierに登録済み。
  • .envファイルに正しいZAPIER_API_TOKENZAPIER_WEBHOOK_SECRETが設定済み。
  • APIトークンにZapsへの読み書き権限がある。

サーバーのテスト

サーバーが正しく動作するか確認します:

  1. ngrok起動

    ngrok http 8140
    

    ngrok URL(例:https://abc123.ngrok.io)を記録し、ZapierのWebhook設定に設定(例:https://abc123.ngrok.io/webhook)。

  2. サーバー起動

    python zapier_realtime_server.py
    

    コンソールに「ZapierリアルタイムMCPサーバーを起動中: http://localhost:8140」と「Webhookサーバーを起動中: http://localhost:8140」が表示。

  3. イベント検知のテスト

    • ZapierのUIでZapsを実行(例:Gmailでメール受信)またはエラーを発生させる。
    • サーバーがイベントをWebhook経由で検知。
  4. 最新イベント取得のテスト
    Pythonでリクエストを送信:

    import requests
    import json
    
    url = "http://localhost:8140"
    payload = {
        "jsonrpc": "2.0",
        "method": "get_latest_event",
        "params": {},
        "id": 1
    }
    response = requests.post(url, json=payload)
    print(json.dumps(response.json(), indent=2, ensure_ascii=False))
    

    期待されるレスポンス:

    {
      "jsonrpc": "2.0",
      "result": {
        "status": "success",
        "event_info": {
          "event_type": "zap_execution",
          "zap_id": "12347",
          "status": "success",
          "timestamp": "2025-04-22T14:00:00Z",
          "details": {"trigger": "New Email", "action": "Create Card"}
        }
      },
      "id": 1
    }
    
  5. 通知送信のテスト

    payload = {
        "jsonrpc": "2.0",
        "method": "send_notification",
        "params": {
            "zap_id": "12347",
            "message": "Zaps実行が成功しました!"
        },
        "id": 2
    }
    response = requests.post(url, json=payload)
    print(json.dumps(response.json(), indent=2, ensure_ascii=False))
    

    期待されるレスポンス:

    {
      "jsonrpc": "2.0",
      "result": {
        "status": "success",
        "notification_sent": true
      },
      "id": 2
    }
    

Claude Desktopとの接続

サーバーをClaude Desktopに接続します:

  1. 設定ファイルの編集
    Claude Desktopの設定ファイル(例:claude_desktop_config.json)に以下を追加:

    {
      "mcp_servers": [
        {
          "name": "ZapierRealtimeServer",
          "url": "http://localhost:8140",
          "auth": "none"
        }
      ]
    }
    
  2. Claudeでテスト
    Claude Desktopを起動し、プロンプトを入力:

    最新のZapsイベントを教えてください。
    

    レスポンス例:

    最新のZapsイベント:
    - イベント:Zaps実行
    - Zaps ID:12347
    - ステータス:成功
    - タイムスタンプ:2025-04-22 14:00
    - 詳細:トリガー(New Email)、アクション(Create Card)
    

    別のプロンプト:

    Zaps ID 12347に「Zaps実行が成功しました!」と通知してください。
    

    レスポンス例:

    Zaps ID 12347に通知を送信しました。
    

実装のコツと注意点

  • Webhook検証:Zapierの署名検証(X-Zapier-Signature)を正しく処理。
  • レートリミティング:Zapier APIの制限(例:100リクエスト/分、プランによる)に注意。
  • セキュリティ:本番環境では、HTTPSを有効化し、Webhookシークレットを必須化。
  • テスト:テスト用Zapsを作成し、本番ワークフローに影響を与えない。
  • 拡張性:大量のイベントを処理する場合、キュー(例:Redis、RabbitMQ)を検討。

試してみよう:挑戦課題

以下の機能を追加して、エージェントを強化してみてください:

  • エラーイベント検知時に代替Zapsを自動作成。
  • WebhookイベントをGoogle Sheetsに記録する機能。
  • イベントデータを分析し、Zapsの実行頻度を最適化。

まとめと次のステップ

この第4章では、ZapierのWebhookを活用してリアルタイム管理AIを構築しました。Zapsの実行やエラーイベントをリアルタイムで検知し、AIが通知や応答を生成できるようになりました。

次の第5章では、MCPサーバーの最適化とコミュニティへの貢献に焦点を当てます。サーバーのパフォーマンス向上、セキュリティ強化、そしてZapier用MCPサーバーをオープンソースとして共有する方法を学びます。コミュニティAIの未来に興味がある方は、ぜひお楽しみに!


役に立ったと思ったら、「いいね」や「ストック」をしていただけると嬉しいです!次の章でまたお会いしましょう!

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?