はじめに
第3章では、ZapierのZapsと実行履歴を活用してワークフロー分析エージェントを構築しました。実行頻度やエラーレートを分析することで、AIがワークフローのインサイトを提供し、プロセス改善を支援できるようになりました。今回は、ZapierのWebhook機能を利用して、Zapsの実行やエラーイベントをリアルタイムで検知するリアルタイム管理AIを構築します。
この第4章では、MCPサーバーにZapierのWebhookを統合し、Zapsの実行、成功、失敗などのイベントを即座に捕捉します。AIはこれを利用して、自動で通知を送信したり、エラー発生時に代替アクションを提案したりできます。コード例とステップごとのガイドで、リアルタイム管理AIの構築を体験しましょう。さあ、始めましょう!
リアルタイム管理AIとは?
リアルタイム管理AIは、Zapierのイベント(Zaps実行、成功、失敗、トリガー発火など)を監視し、ワークフロー管理を動的に支援するエージェントです。MCPサーバーとZapierのWebhookを組み合わせることで、以下のような機能を実現できます:
- イベント検知:Zapsの実行やエラーをリアルタイムで捕捉。
- 自動応答:エラーイベントに基づいて通知や代替Zapsを提案。
- 外部連携:イベントをSlackやメールに通知し、コラボレーションを強化。
ユースケース
- ワークフロー監視:AIがZapsの失敗を検知し、Slackに通知。
- エラー対応:AIが失敗したZapsを一時停止し、代替ワークフローを提案。
- プロセス最適化:AIが頻繁に実行されるZapsを検出し、効率化を提案。
開発環境の準備
第3章の環境を基に、以下の追加準備を行います:
- Python 3.8以降、mcpライブラリ、requestsライブラリ、Claude Desktop:これまでと同じ。
- python-dotenv:環境変数の管理(既にインストール済み)。
- ngrok:ローカルサーバーを公開し、Webhookを受信。
- Zapierアカウント:Webhook対応の設定。
インストールコマンド(必要に応じて):
pip install requests python-dotenv
Zapierのセットアップ
-
Zapier APIトークンの確認:
- 第3章のトークンを使用。
- 権限を確認:Zaps操作(読み書き)、Webhook管理。
-
Webhookの設定:
- ZapierのDeveloper PlatformまたはZaps内でWebhookアプリを使用。
- 新しいZapsを作成し、Webhookトリガーを設定:
- トリガー:Zapierの「Catch Hook」(カスタムWebhook)。
- アクション:テスト用(例:Slackに通知、Google Sheetsに記録)。
- Webhook URLを後で取得(ngrok経由)。
-
Zapierワークフロー準備:
- テスト用Zapsを追加(例:Gmail→Trello、Slack→Google Sheets)。
- Zapsを実行し、成功・失敗イベントを生成(例:無効な接続で失敗)。
-
ngrokの設定:
- ngrokをインストール(
brew install ngrok
または公式サイト)。 - ローカルサーバーを公開:
ngrok http 8140
- 生成されたURLを記録(例:
https://abc123.ngrok.io
)。 - ZapierのWebhook設定にURLを設定(例:
https://abc123.ngrok.io/webhook
)。
- ngrokをインストール(
-
環境変数:
第3章の.env
ファイルに以下を追加:ZAPIER_API_TOKEN=your_zapier_api_token ZAPIER_WEBHOOK_SECRET=mysecret
コード例:リアルタイム管理用MCPサーバー
以下のMCPサーバーは、ZapierのWebhookを介してイベントを検知し、通知やZaps操作の機能を提供します。
from mcp import MCPServer
import os
from dotenv import load_dotenv
import requests
from http.server import BaseHTTPRequestHandler, HTTPServer
import json
import hmac
import hashlib
import threading
class ZapierRealtimeServer(MCPServer):
def __init__(self, host, port, api_token, webhook_secret):
super().__init__(host, port)
self.api_token = api_token
self.webhook_secret = webhook_secret
self.base_url = "https://api.zapier.com/v1"
self.headers = {
"Authorization": f"Bearer {api_token}",
"Accept": "application/json",
"Content-Type": "application/json"
}
self.latest_event = None
self.register_resource("get_latest_event", self.get_latest_event)
self.register_tool("send_notification", self.send_notification)
self.start_webhook_server()
def verify_webhook_signature(self, body, signature):
try:
computed_sig = "sha256=" + hmac.new(
self.webhook_secret.encode("utf-8"),
body.encode("utf-8"),
hashlib.sha256
).hexdigest()
return hmac.compare_digest(computed_sig, signature)
except Exception as e:
return False
def send_notification(self, params):
try:
zap_id = params.get("zap_id", "")
message = params.get("message", "")
if not zap_id or not message:
return {"status": "error", "message": "Zaps IDとメッセージが必要です"}
# 例:Slackに通知(実際のアクションはZapier Zapsで設定)
url = f"{self.base_url}/zaps/{zap_id}/trigger"
payload = {"message": message}
response = requests.post(url, headers=self.headers, json=payload)
response.raise_for_status()
return {"status": "success", "notification_sent": True}
except Exception as e:
return {"status": "error", "message": str(e)}
def get_latest_event(self, params):
try:
if self.latest_event:
event_info = {
"event_type": self.latest_event.get("event_type", ""),
"zap_id": self.latest_event.get("zap_id", ""),
"status": self.latest_event.get("status", ""),
"timestamp": self.latest_event.get("timestamp", ""),
"details": self.latest_event.get("details", {})
}
return {"status": "success", "event_info": event_info}
return {"status": "success", "event_info": None, "message": "イベントなし"}
except Exception as e:
return {"status": "error", "message": str(e)}
def start_webhook_server(self):
class WebhookHandler(BaseHTTPRequestHandler):
def do_POST(self):
content_length = int(self.headers["Content-Length"])
post_data = self.rfile.read(content_length)
signature = self.headers.get("X-Zapier-Signature", "")
# Webhook署名検証
if not self.server.parent.verify_webhook_signature(post_data.decode("utf-8"), signature):
self.send_response(401)
self.end_headers()
self.wfile.write(b"Invalid signature")
return
data = json.loads(post_data.decode("utf-8"))
event_type = data.get("event_type", "unknown")
zap_id = data.get("zap_id", "")
# イベント処理
if event_type in ["zap_execution", "zap_error"]:
self.server.parent.latest_event = {
"event_type": event_type,
"zap_id": zap_id,
"status": data.get("status", ""),
"timestamp": data.get("timestamp", ""),
"details": data.get("details", {})
}
self.send_response(200)
self.end_headers()
self.wfile.write(b"Webhook received")
server = HTTPServer(("localhost", 8140), WebhookHandler)
server.parent = self
threading.Thread(target=server.serve_forever, daemon=True).start()
print("Webhookサーバーを起動中: http://localhost:8140")
if __name__ == "__main__":
load_dotenv()
server = ZapierRealtimeServer(
host="localhost",
port=8140,
api_token=os.getenv("ZAPIER_API_TOKEN"),
webhook_secret=os.getenv("ZAPIER_WEBHOOK_SECRET")
)
print("ZapierリアルタイムMCPサーバーを起動中: http://localhost:8140")
server.start()
コードの説明
-
verify_webhook_signature:ZapierのWebhook署名(
X-Zapier-Signature
)をHMAC-SHA256で検証。 - send_notification:指定したZapsに通知を送信(例:SlackやGoogle Sheetsにメッセージ)。
- get_latest_event:Webhookで受信した最新イベント(Zaps実行、失敗など)を取得。
- start_webhook_server:ローカルWebhookサーバーを起動し、Zapierからのイベントを処理。
- WebhookHandler:Zapierのイベント(zap_execution、zap_error)を受信し、署名検証を行う。
前提条件
- ZapierにWebhookトリガーが設定済み(例:Catch Hookでイベント捕捉)。
- ngrokでWebhook URLが公開され、Zapierに登録済み。
-
.env
ファイルに正しいZAPIER_API_TOKEN
とZAPIER_WEBHOOK_SECRET
が設定済み。 - APIトークンにZapsへの読み書き権限がある。
サーバーのテスト
サーバーが正しく動作するか確認します:
-
ngrok起動:
ngrok http 8140
ngrok URL(例:
https://abc123.ngrok.io
)を記録し、ZapierのWebhook設定に設定(例:https://abc123.ngrok.io/webhook
)。 -
サーバー起動:
python zapier_realtime_server.py
コンソールに「ZapierリアルタイムMCPサーバーを起動中: http://localhost:8140」と「Webhookサーバーを起動中: http://localhost:8140」が表示。
-
イベント検知のテスト:
- ZapierのUIでZapsを実行(例:Gmailでメール受信)またはエラーを発生させる。
- サーバーがイベントをWebhook経由で検知。
-
最新イベント取得のテスト:
Pythonでリクエストを送信:import requests import json url = "http://localhost:8140" payload = { "jsonrpc": "2.0", "method": "get_latest_event", "params": {}, "id": 1 } response = requests.post(url, json=payload) print(json.dumps(response.json(), indent=2, ensure_ascii=False))
期待されるレスポンス:
{ "jsonrpc": "2.0", "result": { "status": "success", "event_info": { "event_type": "zap_execution", "zap_id": "12347", "status": "success", "timestamp": "2025-04-22T14:00:00Z", "details": {"trigger": "New Email", "action": "Create Card"} } }, "id": 1 }
-
通知送信のテスト:
payload = { "jsonrpc": "2.0", "method": "send_notification", "params": { "zap_id": "12347", "message": "Zaps実行が成功しました!" }, "id": 2 } response = requests.post(url, json=payload) print(json.dumps(response.json(), indent=2, ensure_ascii=False))
期待されるレスポンス:
{ "jsonrpc": "2.0", "result": { "status": "success", "notification_sent": true }, "id": 2 }
Claude Desktopとの接続
サーバーをClaude Desktopに接続します:
-
設定ファイルの編集:
Claude Desktopの設定ファイル(例:claude_desktop_config.json
)に以下を追加:{ "mcp_servers": [ { "name": "ZapierRealtimeServer", "url": "http://localhost:8140", "auth": "none" } ] }
-
Claudeでテスト:
Claude Desktopを起動し、プロンプトを入力:最新のZapsイベントを教えてください。
レスポンス例:
最新のZapsイベント: - イベント:Zaps実行 - Zaps ID:12347 - ステータス:成功 - タイムスタンプ:2025-04-22 14:00 - 詳細:トリガー(New Email)、アクション(Create Card)
別のプロンプト:
Zaps ID 12347に「Zaps実行が成功しました!」と通知してください。
レスポンス例:
Zaps ID 12347に通知を送信しました。
実装のコツと注意点
-
Webhook検証:Zapierの署名検証(
X-Zapier-Signature
)を正しく処理。 - レートリミティング:Zapier APIの制限(例:100リクエスト/分、プランによる)に注意。
- セキュリティ:本番環境では、HTTPSを有効化し、Webhookシークレットを必須化。
- テスト:テスト用Zapsを作成し、本番ワークフローに影響を与えない。
- 拡張性:大量のイベントを処理する場合、キュー(例:Redis、RabbitMQ)を検討。
試してみよう:挑戦課題
以下の機能を追加して、エージェントを強化してみてください:
- エラーイベント検知時に代替Zapsを自動作成。
- WebhookイベントをGoogle Sheetsに記録する機能。
- イベントデータを分析し、Zapsの実行頻度を最適化。
まとめと次のステップ
この第4章では、ZapierのWebhookを活用してリアルタイム管理AIを構築しました。Zapsの実行やエラーイベントをリアルタイムで検知し、AIが通知や応答を生成できるようになりました。
次の第5章では、MCPサーバーの最適化とコミュニティへの貢献に焦点を当てます。サーバーのパフォーマンス向上、セキュリティ強化、そしてZapier用MCPサーバーをオープンソースとして共有する方法を学びます。コミュニティAIの未来に興味がある方は、ぜひお楽しみに!
役に立ったと思ったら、「いいね」や「ストック」をしていただけると嬉しいです!次の章でまたお会いしましょう!