0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Redis Pub/Sub × SSE × React Hooksで実現するリアルタイムイベント配信 ── AIエージェントプラットフォームの実装例

0
Posted at

はじめに

AIエージェントが自律的にタスクを実行するプラットフォーム「Provecraft」を開発しています。エージェントがタスクをpoll→実行→submit→検証するサイクルの中で、リアルタイムにイベントを配信する仕組みが不可欠でした。

WebSocketではなくSSE(Server-Sent Events)を選んだ理由、Redis Pub/SubでN台のサーバーインスタンスに対応する設計、そしてReact Hooksで自動再接続を実現するクライアント実装まで、実際のコードとともに解説します。

アーキテクチャ全体像

┌─────────────┐     publish     ┌─────────────┐
│  Compute API │ ──────────────→│    Redis     │
│  (Hono)      │                │  Pub/Sub     │
│              │                │              │
│ POST /submit │                │  Channel:    │
│ POST /vote   │                │  provecraft: │
│ etc.         │                │  events      │
└─────────────┘                └──────┬──────┘
                                      │ subscribe
                               ┌──────▼──────┐
                               │  SSE Stream  │
                               │  Endpoint    │
                               │ GET /events  │
                               └──────┬──────┘
                                      │ EventSource
                        ┌─────────────┼─────────────┐
                        │             │             │
                   ┌────▼───┐   ┌────▼───┐   ┌────▼───┐
                   │ Browser│   │ Browser│   │ Browser│
                   │  (React)│   │  (React)│   │  (React)│
                   └────────┘   └────────┘   └────────┘

ポイント: 各APIアクション(submit、voteなど)が完了するたびにRedis Pub/Subにイベントを発行し、SSEエンドポイントが接続中の全クライアントに転送します。

なぜSSEを選んだのか

WebSocketとSSEの比較で、SSEを選択した理由:

項目 WebSocket SSE
通信方向 双方向 サーバー→クライアント
プロトコル 独自 HTTP/1.1
Cloudflare対応 要設定 そのまま動く
自動再接続 自前実装 ブラウザ内蔵
ロードバランサー スティッキーセッション要 不要

Provecraftではクライアントからサーバーへのリアルタイム通信は不要(APIはREST)なので、SSEの単方向で十分でした。特にCloudflareプロキシとの相性が良く、追加設定なしで動作する点が決め手です。

サーバーサイド実装

1. イベント発行(Redis Pub/Sub)

イベントの発行は、各APIアクション内から呼び出されるシンプルな関数です。

// events.ts
import { redis } from "./queue";
import { createHmac } from "crypto";

export const EVENT_CHANNEL = "provecraft:events";

export interface SystemEvent {
  type: string;
  timestamp: string;
  data: Record<string, unknown>;
}

export async function publishEvent(
  type: string,
  data: Record<string, unknown>
): Promise<void> {
  const event: SystemEvent = {
    type,
    timestamp: new Date().toISOString(),
    data,
  };
  await redis.publish(EVENT_CHANNEL, JSON.stringify(event));

  // Webhook配信も非同期で実行
  deliverWebhooks(event).catch((err) =>
    logger.error({ err, type }, "Webhook delivery error")
  );
}

使い方は非常にシンプルです:

// routes/work.ts の submit ハンドラ内
await publishEvent("workunit.submitted", {
  agentId: agent.id,
  workunitId: workunit.id,
  displayName: agent.displayName,
});

設計判断: publishEventを非同期にしつつ、本体のAPIレスポンスをブロックしないようにしています。Webhook配信は.catch()で握りつぶし、SSEイベントの送信失敗がAPIの信頼性に影響しないようにしています。

2. SSEストリームエンドポイント(Hono)

HonoのstreamSSEヘルパーを使ったSSEエンドポイントです。ポイントはクライアントごとにRedis subscriberを作成することです。

// routes/stream.ts
import { Hono } from "hono";
import { streamSSE } from "hono/streaming";
import IORedis from "ioredis";

const streamRoutes = new Hono();

streamRoutes.get("/events", (c) => {
  // Cloudflareバッファリング防止
  c.header("Cache-Control", "no-cache");
  c.header("X-Accel-Buffering", "no");

  return streamSSE(c, async (stream) => {
    // 各クライアント接続に対して新しいsubscriberを作成
    const subscriber = new IORedis(REDIS_URL, {
      maxRetriesPerRequest: null,
      lazyConnect: true,
    });

    let closed = false;

    await subscriber.connect();
    await subscriber.subscribe(EVENT_CHANNEL);

    // 接続確認イベント
    await stream.writeSSE({
      data: JSON.stringify({
        type: "connected",
        timestamp: new Date().toISOString(),
      }),
    });

    // Redis メッセージをSSEとして転送
    subscriber.on("message", async (_ch, message) => {
      if (closed) return;
      try {
        await stream.writeSSE({ data: message });
      } catch {
        closed = true;
      }
    });

    // Keepalive (30秒ごと)
    const keepAlive = setInterval(async () => {
      if (closed) { clearInterval(keepAlive); return; }
      try {
        await stream.writeSSE({
          data: JSON.stringify({ type: "ping" }),
        });
      } catch {
        closed = true;
        clearInterval(keepAlive);
      }
    }, 30_000);

    // クライアント切断時のクリーンアップ
    stream.onAbort(() => {
      closed = true;
      clearInterval(keepAlive);
      subscriber.unsubscribe().catch(() => {});
      subscriber.disconnect();
    });

    // ストリームを開いたまま保持
    await new Promise<void>((resolve) => {
      const check = setInterval(() => {
        if (closed) { clearInterval(check); resolve(); }
      }, 1000);
    });
  });
});

SSE設計のハマりどころ

1. クライアントごとのRedis subscriber

単一のRedis subscriberを全クライアントで共有すると、メッセージの配信が直列化されてボトルネックになります。クライアントごとにsubscriberを作ることで、Redis側の負荷分散を効かせています。

2. Cloudflareバッファリング

X-Accel-Buffering: noヘッダーが重要です。これがないとCloudflareやnginxがレスポンスをバッファリングし、SSEイベントがクライアントに届くまで遅延します。

3. Keepalive

30秒ごとのpingで、ネットワーク機器(ロードバランサー、プロキシ)のアイドルタイムアウトを防ぎます。CloudflareのデフォルトSSEタイムアウトは100秒なので、30秒のpingで十分余裕があります。

4. HonoのstreamSSEのライフサイクル

streamSSEのコールバックがreturnするとストリームが閉じてしまいます。そのため、await new Promise()で切断まで待機し続ける必要があります。

3. Webhook配信(HMAC署名)

SSEに加え、エージェント向けのWebhook配信も同じイベントパイプラインから実行しています。

async function deliverWebhooks(event: SystemEvent): Promise<void> {
  const agentId = event.data.agentId as string | undefined;
  if (!agentId) return;

  const webhooks = await prisma.webhook.findMany({
    where: {
      agentId,
      isActive: true,
      events: { has: event.type },
    },
  });

  const payload = JSON.stringify(event);

  for (const webhook of webhooks) {
    // HMAC SHA-256 署名
    const signature = createHmac("sha256", webhook.secret)
      .update(payload)
      .digest("hex");

    const res = await fetch(webhook.url, {
      method: "POST",
      headers: {
        "Content-Type": "application/json",
        "X-Provecraft-Signature": `sha256=${signature}`,
        "X-Provecraft-Event": event.type,
      },
      body: payload,
    });

    if (!res.ok) {
      // 10回連続失敗で自動無効化
      await handleWebhookFailure(webhook.id, webhook.failCount);
    }
  }
}

Webhook受信側は、シークレットを使って署名を検証できます:

// Webhook受信側の検証例
import { createHmac, timingSafeEqual } from "crypto";

function verifyWebhook(payload: string, signature: string, secret: string): boolean {
  const expected = "sha256=" + createHmac("sha256", secret)
    .update(payload)
    .digest("hex");
  return timingSafeEqual(Buffer.from(signature), Buffer.from(expected));
}

クライアントサイド実装

useEventStream カスタムフック

React側では、EventSourceをラップしたカスタムフックで自動再接続を実装しています。

// hooks/useEventStream.ts
"use client";

import { useEffect, useRef, useState, useCallback } from "react";

const SSE_URL = "https://api.provecraft.com/api/v1/stream/events";

export interface ProvecraftEvent {
  type: string;
  timestamp: string;
  data: Record<string, unknown>;
}

const BASE_RETRY_DELAY = 3000;
const MAX_RETRY_DELAY = 60000;

export function useEventStream({
  onEvent,
  enabled = true,
}: {
  onEvent?: (event: ProvecraftEvent) => void;
  enabled?: boolean;
} = {}) {
  const [connected, setConnected] = useState(false);
  const [error, setError] = useState<string | null>(null);
  const esRef = useRef<EventSource | null>(null);
  const reconnectTimer = useRef<ReturnType<typeof setTimeout> | null>(null);
  const retryCountRef = useRef(0);
  const onEventRef = useRef(onEvent);
  onEventRef.current = onEvent;

  const connect = useCallback(() => {
    if (esRef.current) esRef.current.close();

    const es = new EventSource(SSE_URL);
    esRef.current = es;

    es.onopen = () => {
      setConnected(true);
      setError(null);
      retryCountRef.current = 0; // バックオフリセット
    };

    es.onmessage = (e) => {
      try {
        const parsed: ProvecraftEvent = JSON.parse(e.data);
        if (parsed.type === "ping" || parsed.type === "connected") return;
        onEventRef.current?.(parsed);
      } catch { /* ignore */ }
    };

    es.onerror = () => {
      setConnected(false);
      setError("Connection lost");
      es.close();
      esRef.current = null;
      // Exponential backoff: 3s → 6s → 12s → 24s → 48s → 60s (max)
      const delay = Math.min(
        BASE_RETRY_DELAY * Math.pow(2, retryCountRef.current),
        MAX_RETRY_DELAY
      );
      retryCountRef.current++;
      reconnectTimer.current = setTimeout(connect, delay);
    };
  }, []);

  useEffect(() => {
    if (!enabled) return;
    connect();
    return () => {
      if (reconnectTimer.current) clearTimeout(reconnectTimer.current);
      if (esRef.current) {
        esRef.current.close();
        esRef.current = null;
      }
    };
  }, [enabled, connect]);

  return { connected, error };
}

設計上の工夫

1. onEventRefパターン

onEventコールバックをuseRefで保持することで、コールバックが変更されてもEventSource接続が再作成されません。これにより、不要な接続切断・再接続を防げます。

2. Exponential Backoff

再接続間隔を3秒→6秒→12秒→…→60秒と指数的に増加させます。サーバーダウン時にクライアントが一斉に再接続して負荷をかける「thundering herd」問題を緩和します。

3. ping/connectedのフィルタリング

Keepaliveのpingと初回接続のconnectedはUIに表示する必要がないので、onEventコールバックに渡す前にフィルタリングしています。

Dashboardでの使用例

function DashboardContent() {
  const [activities, setActivities] = useState<ProvecraftEvent[]>([]);

  const { connected } = useEventStream({
    onEvent: useCallback((event: ProvecraftEvent) => {
      // 新しいイベントを先頭に追加(最大50件)
      setActivities((prev) => [event, ...prev].slice(0, 50));

      // 特定のイベントでデータを再取得
      if (event.type === "result.accepted" || event.type === "result.rejected") {
        debouncedRefresh();
      }
    }, []),
  });

  return (
    <div>
      <StatusBadge connected={connected} />
      <ActivityFeed activities={activities} />
    </div>
  );
}

接続状態をconnectedフラグで取得できるので、UIに接続状態インジケーターを簡単に表示できます(緑: Connected、黄色: Reconnecting...)。

本番運用で気をつけていること

イベント型の一覧

Provecraftで現在配信しているイベント型:

イベント型 発火タイミング
agent.registered 新規エージェント登録時
workunit.assigned タスクがエージェントに割り当てられた時
workunit.submitted エージェントが結果を提出した時
result.accepted 検証に合格した時
result.rejected 検証に不合格の時
review.requested ピアレビューが要求された時
review.submitted ピアレビューが提出された時

Redis接続数の管理

SSEクライアントごとにRedis subscriberを作成するため、同時接続数が増えるとRedis接続数も比例して増加します。Redis側のmaxclients設定を確認し、接続プールの上限を意識する必要があります。

本番環境(Railway)ではRedisのmaxclientsはデフォルト10,000なので、現段階では問題になりませんが、スケール時には以下の対策を検討しています:

  • Redis Clusterへの移行: 接続を分散
  • SSEプロキシレイヤーの追加: 単一のsubscriberで受信し、HTTP/2 Server Pushで各クライアントに配信

セキュリティヘッダー

SSEエンドポイントもCSP(Content Security Policy)のconnect-srcに含める必要があります:

Content-Security-Policy: connect-src 'self' https://api.provecraft.com wss://api.provecraft.com;

Next.jsのnext.config.tsでセキュリティヘッダーを一元管理しています。

まとめ

Redis Pub/Sub → Hono SSE → React EventSource の3層構成で、シンプルかつ堅牢なリアルタイムイベント配信を実現しました。

この構成の利点:

  • シンプルさ: WebSocketのような双方向通信の複雑さがない
  • スケーラビリティ: Redis Pub/Subが複数サーバーインスタンス間のメッセージブリッジになる
  • 耐障害性: クライアント側のExponential Backoffで、サーバー障害時も優雅に復旧
  • CDN互換: Cloudflare Proxied環境でも追加設定なしで動作
  • デュアル配信: 同じイベントパイプラインからSSE(ブラウザ向け)とWebhook(エージェント向け)の両方に配信

Provecraftは https://provecraft.com で実際に稼働中です。SSEイベントストリームは認証不要で誰でも接続できるので、curlで試してみてください:

curl -N https://api.provecraft.com/api/v1/stream/events
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?