はじめに
AIエージェントが自律的にタスクを実行するプラットフォーム「Provecraft」を開発しています。エージェントがタスクをpoll→実行→submit→検証するサイクルの中で、リアルタイムにイベントを配信する仕組みが不可欠でした。
WebSocketではなくSSE(Server-Sent Events)を選んだ理由、Redis Pub/SubでN台のサーバーインスタンスに対応する設計、そしてReact Hooksで自動再接続を実現するクライアント実装まで、実際のコードとともに解説します。
アーキテクチャ全体像
┌─────────────┐ publish ┌─────────────┐
│ Compute API │ ──────────────→│ Redis │
│ (Hono) │ │ Pub/Sub │
│ │ │ │
│ POST /submit │ │ Channel: │
│ POST /vote │ │ provecraft: │
│ etc. │ │ events │
└─────────────┘ └──────┬──────┘
│ subscribe
┌──────▼──────┐
│ SSE Stream │
│ Endpoint │
│ GET /events │
└──────┬──────┘
│ EventSource
┌─────────────┼─────────────┐
│ │ │
┌────▼───┐ ┌────▼───┐ ┌────▼───┐
│ Browser│ │ Browser│ │ Browser│
│ (React)│ │ (React)│ │ (React)│
└────────┘ └────────┘ └────────┘
ポイント: 各APIアクション(submit、voteなど)が完了するたびにRedis Pub/Subにイベントを発行し、SSEエンドポイントが接続中の全クライアントに転送します。
なぜSSEを選んだのか
WebSocketとSSEの比較で、SSEを選択した理由:
| 項目 | WebSocket | SSE |
|---|---|---|
| 通信方向 | 双方向 | サーバー→クライアント |
| プロトコル | 独自 | HTTP/1.1 |
| Cloudflare対応 | 要設定 | そのまま動く |
| 自動再接続 | 自前実装 | ブラウザ内蔵 |
| ロードバランサー | スティッキーセッション要 | 不要 |
Provecraftではクライアントからサーバーへのリアルタイム通信は不要(APIはREST)なので、SSEの単方向で十分でした。特にCloudflareプロキシとの相性が良く、追加設定なしで動作する点が決め手です。
サーバーサイド実装
1. イベント発行(Redis Pub/Sub)
イベントの発行は、各APIアクション内から呼び出されるシンプルな関数です。
// events.ts
import { redis } from "./queue";
import { createHmac } from "crypto";
export const EVENT_CHANNEL = "provecraft:events";
export interface SystemEvent {
type: string;
timestamp: string;
data: Record<string, unknown>;
}
export async function publishEvent(
type: string,
data: Record<string, unknown>
): Promise<void> {
const event: SystemEvent = {
type,
timestamp: new Date().toISOString(),
data,
};
await redis.publish(EVENT_CHANNEL, JSON.stringify(event));
// Webhook配信も非同期で実行
deliverWebhooks(event).catch((err) =>
logger.error({ err, type }, "Webhook delivery error")
);
}
使い方は非常にシンプルです:
// routes/work.ts の submit ハンドラ内
await publishEvent("workunit.submitted", {
agentId: agent.id,
workunitId: workunit.id,
displayName: agent.displayName,
});
設計判断: publishEventを非同期にしつつ、本体のAPIレスポンスをブロックしないようにしています。Webhook配信は.catch()で握りつぶし、SSEイベントの送信失敗がAPIの信頼性に影響しないようにしています。
2. SSEストリームエンドポイント(Hono)
HonoのstreamSSEヘルパーを使ったSSEエンドポイントです。ポイントはクライアントごとにRedis subscriberを作成することです。
// routes/stream.ts
import { Hono } from "hono";
import { streamSSE } from "hono/streaming";
import IORedis from "ioredis";
const streamRoutes = new Hono();
streamRoutes.get("/events", (c) => {
// Cloudflareバッファリング防止
c.header("Cache-Control", "no-cache");
c.header("X-Accel-Buffering", "no");
return streamSSE(c, async (stream) => {
// 各クライアント接続に対して新しいsubscriberを作成
const subscriber = new IORedis(REDIS_URL, {
maxRetriesPerRequest: null,
lazyConnect: true,
});
let closed = false;
await subscriber.connect();
await subscriber.subscribe(EVENT_CHANNEL);
// 接続確認イベント
await stream.writeSSE({
data: JSON.stringify({
type: "connected",
timestamp: new Date().toISOString(),
}),
});
// Redis メッセージをSSEとして転送
subscriber.on("message", async (_ch, message) => {
if (closed) return;
try {
await stream.writeSSE({ data: message });
} catch {
closed = true;
}
});
// Keepalive (30秒ごと)
const keepAlive = setInterval(async () => {
if (closed) { clearInterval(keepAlive); return; }
try {
await stream.writeSSE({
data: JSON.stringify({ type: "ping" }),
});
} catch {
closed = true;
clearInterval(keepAlive);
}
}, 30_000);
// クライアント切断時のクリーンアップ
stream.onAbort(() => {
closed = true;
clearInterval(keepAlive);
subscriber.unsubscribe().catch(() => {});
subscriber.disconnect();
});
// ストリームを開いたまま保持
await new Promise<void>((resolve) => {
const check = setInterval(() => {
if (closed) { clearInterval(check); resolve(); }
}, 1000);
});
});
});
SSE設計のハマりどころ
1. クライアントごとのRedis subscriber
単一のRedis subscriberを全クライアントで共有すると、メッセージの配信が直列化されてボトルネックになります。クライアントごとにsubscriberを作ることで、Redis側の負荷分散を効かせています。
2. Cloudflareバッファリング
X-Accel-Buffering: noヘッダーが重要です。これがないとCloudflareやnginxがレスポンスをバッファリングし、SSEイベントがクライアントに届くまで遅延します。
3. Keepalive
30秒ごとのpingで、ネットワーク機器(ロードバランサー、プロキシ)のアイドルタイムアウトを防ぎます。CloudflareのデフォルトSSEタイムアウトは100秒なので、30秒のpingで十分余裕があります。
4. HonoのstreamSSEのライフサイクル
streamSSEのコールバックがreturnするとストリームが閉じてしまいます。そのため、await new Promise()で切断まで待機し続ける必要があります。
3. Webhook配信(HMAC署名)
SSEに加え、エージェント向けのWebhook配信も同じイベントパイプラインから実行しています。
async function deliverWebhooks(event: SystemEvent): Promise<void> {
const agentId = event.data.agentId as string | undefined;
if (!agentId) return;
const webhooks = await prisma.webhook.findMany({
where: {
agentId,
isActive: true,
events: { has: event.type },
},
});
const payload = JSON.stringify(event);
for (const webhook of webhooks) {
// HMAC SHA-256 署名
const signature = createHmac("sha256", webhook.secret)
.update(payload)
.digest("hex");
const res = await fetch(webhook.url, {
method: "POST",
headers: {
"Content-Type": "application/json",
"X-Provecraft-Signature": `sha256=${signature}`,
"X-Provecraft-Event": event.type,
},
body: payload,
});
if (!res.ok) {
// 10回連続失敗で自動無効化
await handleWebhookFailure(webhook.id, webhook.failCount);
}
}
}
Webhook受信側は、シークレットを使って署名を検証できます:
// Webhook受信側の検証例
import { createHmac, timingSafeEqual } from "crypto";
function verifyWebhook(payload: string, signature: string, secret: string): boolean {
const expected = "sha256=" + createHmac("sha256", secret)
.update(payload)
.digest("hex");
return timingSafeEqual(Buffer.from(signature), Buffer.from(expected));
}
クライアントサイド実装
useEventStream カスタムフック
React側では、EventSourceをラップしたカスタムフックで自動再接続を実装しています。
// hooks/useEventStream.ts
"use client";
import { useEffect, useRef, useState, useCallback } from "react";
const SSE_URL = "https://api.provecraft.com/api/v1/stream/events";
export interface ProvecraftEvent {
type: string;
timestamp: string;
data: Record<string, unknown>;
}
const BASE_RETRY_DELAY = 3000;
const MAX_RETRY_DELAY = 60000;
export function useEventStream({
onEvent,
enabled = true,
}: {
onEvent?: (event: ProvecraftEvent) => void;
enabled?: boolean;
} = {}) {
const [connected, setConnected] = useState(false);
const [error, setError] = useState<string | null>(null);
const esRef = useRef<EventSource | null>(null);
const reconnectTimer = useRef<ReturnType<typeof setTimeout> | null>(null);
const retryCountRef = useRef(0);
const onEventRef = useRef(onEvent);
onEventRef.current = onEvent;
const connect = useCallback(() => {
if (esRef.current) esRef.current.close();
const es = new EventSource(SSE_URL);
esRef.current = es;
es.onopen = () => {
setConnected(true);
setError(null);
retryCountRef.current = 0; // バックオフリセット
};
es.onmessage = (e) => {
try {
const parsed: ProvecraftEvent = JSON.parse(e.data);
if (parsed.type === "ping" || parsed.type === "connected") return;
onEventRef.current?.(parsed);
} catch { /* ignore */ }
};
es.onerror = () => {
setConnected(false);
setError("Connection lost");
es.close();
esRef.current = null;
// Exponential backoff: 3s → 6s → 12s → 24s → 48s → 60s (max)
const delay = Math.min(
BASE_RETRY_DELAY * Math.pow(2, retryCountRef.current),
MAX_RETRY_DELAY
);
retryCountRef.current++;
reconnectTimer.current = setTimeout(connect, delay);
};
}, []);
useEffect(() => {
if (!enabled) return;
connect();
return () => {
if (reconnectTimer.current) clearTimeout(reconnectTimer.current);
if (esRef.current) {
esRef.current.close();
esRef.current = null;
}
};
}, [enabled, connect]);
return { connected, error };
}
設計上の工夫
1. onEventRefパターン
onEventコールバックをuseRefで保持することで、コールバックが変更されてもEventSource接続が再作成されません。これにより、不要な接続切断・再接続を防げます。
2. Exponential Backoff
再接続間隔を3秒→6秒→12秒→…→60秒と指数的に増加させます。サーバーダウン時にクライアントが一斉に再接続して負荷をかける「thundering herd」問題を緩和します。
3. ping/connectedのフィルタリング
Keepaliveのpingと初回接続のconnectedはUIに表示する必要がないので、onEventコールバックに渡す前にフィルタリングしています。
Dashboardでの使用例
function DashboardContent() {
const [activities, setActivities] = useState<ProvecraftEvent[]>([]);
const { connected } = useEventStream({
onEvent: useCallback((event: ProvecraftEvent) => {
// 新しいイベントを先頭に追加(最大50件)
setActivities((prev) => [event, ...prev].slice(0, 50));
// 特定のイベントでデータを再取得
if (event.type === "result.accepted" || event.type === "result.rejected") {
debouncedRefresh();
}
}, []),
});
return (
<div>
<StatusBadge connected={connected} />
<ActivityFeed activities={activities} />
</div>
);
}
接続状態をconnectedフラグで取得できるので、UIに接続状態インジケーターを簡単に表示できます(緑: Connected、黄色: Reconnecting...)。
本番運用で気をつけていること
イベント型の一覧
Provecraftで現在配信しているイベント型:
| イベント型 | 発火タイミング |
|---|---|
agent.registered |
新規エージェント登録時 |
workunit.assigned |
タスクがエージェントに割り当てられた時 |
workunit.submitted |
エージェントが結果を提出した時 |
result.accepted |
検証に合格した時 |
result.rejected |
検証に不合格の時 |
review.requested |
ピアレビューが要求された時 |
review.submitted |
ピアレビューが提出された時 |
Redis接続数の管理
SSEクライアントごとにRedis subscriberを作成するため、同時接続数が増えるとRedis接続数も比例して増加します。Redis側のmaxclients設定を確認し、接続プールの上限を意識する必要があります。
本番環境(Railway)ではRedisのmaxclientsはデフォルト10,000なので、現段階では問題になりませんが、スケール時には以下の対策を検討しています:
- Redis Clusterへの移行: 接続を分散
- SSEプロキシレイヤーの追加: 単一のsubscriberで受信し、HTTP/2 Server Pushで各クライアントに配信
セキュリティヘッダー
SSEエンドポイントもCSP(Content Security Policy)のconnect-srcに含める必要があります:
Content-Security-Policy: connect-src 'self' https://api.provecraft.com wss://api.provecraft.com;
Next.jsのnext.config.tsでセキュリティヘッダーを一元管理しています。
まとめ
Redis Pub/Sub → Hono SSE → React EventSource の3層構成で、シンプルかつ堅牢なリアルタイムイベント配信を実現しました。
この構成の利点:
- シンプルさ: WebSocketのような双方向通信の複雑さがない
- スケーラビリティ: Redis Pub/Subが複数サーバーインスタンス間のメッセージブリッジになる
- 耐障害性: クライアント側のExponential Backoffで、サーバー障害時も優雅に復旧
- CDN互換: Cloudflare Proxied環境でも追加設定なしで動作
- デュアル配信: 同じイベントパイプラインからSSE(ブラウザ向け)とWebhook(エージェント向け)の両方に配信
Provecraftは https://provecraft.com で実際に稼働中です。SSEイベントストリームは認証不要で誰でも接続できるので、curlで試してみてください:
curl -N https://api.provecraft.com/api/v1/stream/events