6
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

AgentCore 上の Claude Code の出力を AppSync Event API でリアルタイムにブラウザへ流す

6
Posted at

はじめに

Claude CodeをAmazon Bedrock AgentCoreで動かすことを下記の記事で知りました。こちらの記事

色々触ってみて、AppSync Event APIを使うことで、DB登録処理などを挟みつつ会話することができたので、そのやり方をご紹介しようと思います。あくまでもAppSync Event APIとClaudeCodeに絞ってますので、DB登録処理などはAppSync Event APIの使い方記事を参考に追加してください。


AppSync Event API とは

AppSync Event API は、AWS AppSync が提供する WebSocket ベースのリアルタイム Pub/Sub サービスです。2024 年に追加された比較的新しい機能で、従来の AppSync GraphQL Subscription とは別のサービスとして提供されています。

基本的な仕組みはシンプルです。

Publisher                          Subscriber
  │                                   │
  │  HTTP POST /event                 │  WebSocket wss://.../event/realtime
  │  (チャネルにイベントを送信)           │  (チャネルを購読)
  ▼                                   ▼
┌─────────────────────────────────────────┐
│           AppSync Event API             │
│                                         │
│  /channel/a  ──→  購読者A, 購読者B        │
│  /channel/b  ──→  購読者C                │
└─────────────────────────────────────────┘
  • Publisher が HTTP POST でチャネルにイベントを送信します
  • Subscriber が WebSocket でチャネルを購読し、イベントをリアルタイムに受信します
  • チャネルは階層的な名前空間(例: /dialog/job-123)で管理されます

GraphQL のスキーマ定義やリゾルバーは不要で、チャネルに JSON を POST するだけで WebSocket 接続中のクライアントに配信されます。

さらに、Event API にはチャネルごとに イベントハンドラー(Lambda)を設定できます。publish 時や subscribe 時にハンドラーを挟むことで、認可チェック、イベントの加工、外部サービスへの通知やデータの永続化といった処理を組み込むこともできます。つまり、単純な Pub/Sub にとどまらず、リアルタイム通知と同時にバックエンド処理を走らせるといった拡張も可能です。

また、同じチャネルを複数のクライアントが同時に購読できます。例えば /dialog/job-123 を複数人が購読していれば、全員が同じイベントをリアルタイムに受信します。チームメンバーが同じエージェントの実行状況を同時にモニタリングする、といった使い方も自然にできます。

今回のユースケースでは、ストリーミングテキストをそのまま流すだけでハンドラーによる加工や永続化は不要なため、最もシンプルな構成で利用しています。


全体の流れ

Browser(React)
  │
  │ ① ユーザーがプロンプトを入力して送信
  │    POST /trigger(Lambda Function URL)
  ▼
Trigger Lambda
  │
  │ ② AgentCore Runtime を invoke
  │    InvokeAgentRuntimeCommand
  ▼
AgentCore Runtime
  │
  │ ③ Claude Code がプロンプトを処理
  │    claude-agent-sdk → Claude Code CLI
  │    TextBlock を出力するたびに↓
  │
  │ ④ HTTP POST(SigV4 認証)
  ▼
AppSync Event API
  │  チャネル: /dialog/{jobId}
  │
  │ ⑤ WebSocket で配信
  ▼
Browser(React)
  │  テキストチャンクを受信 → 画面に追記
  1. ユーザーがブラウザからプロンプトを送信します
  2. Trigger Lambda が AgentCore Runtime を invoke します
  3. AgentCore 上の Claude Code がプロンプトを受け取り、処理を開始します
  4. Claude Code がテキストを出力するたびに、エージェントが Event API の /dialog/{jobId} チャネルに publish します
  5. ブラウザは同じチャネルを WebSocket で購読しており、テキストがリアルタイムに画面に表示されます

ブラウザは ① のリクエスト送信前に Event API の WebSocket 接続を確立しておき、レスポンスを待たずにストリーミングを受信できる状態にしています。


なぜ AppSync Event API を選んだのか

AgentCore 上のエージェントからブラウザにリアルタイムでテキストを配信する方法はいくつかあります。その中で Event API を選んだ理由を整理します。

要件:高頻度・低レイテンシのテキストチャンク配信

Claude Code は数十文字単位でテキストを断続的に出力します。この出力を「ターミナルに文字が流れるように」ブラウザに表示したいため、以下が求められます。

  • 秒間数回のイベント配信に耐えられること
  • 配信までのレイテンシが小さいこと(数百ミリ秒以内)
  • 配信側の実装がシンプルであること(エージェントのコンテナから直接送れること)

Event API が適している理由

特徴 Event API の挙動
配信方法 HTTP POST 1 回で完結する
中間処理 ハンドラーなしなら Lambda を経由しない
スキーマ定義 不要(任意の JSON を送れる)
WebSocket 管理 AWS 側が自動で管理する
認証 IAM / Cognito / API Key など複数方式に対応している

GraphQL Subscription で同様のことを実現しようとすると、mutation の定義 → Lambda リゾルバーの実装 → Subscription のフィルタリング設定が必要になり、テキストチャンクを流すだけの用途にはオーバーヘッドが大きくなります。

Event API は 「チャネルに POST → WebSocket で配信」というシンプルなモデルのため、ストリーミング配信のような高頻度・軽量なユースケースに適しています。


CDK:Event API の定義

// streaming-sample-stack.ts
const eventApi = new appsync.EventApi(this, 'DialogEventApi', {
  apiName: `${prefix}-dialog-event-api`,
  authorizationConfig: {
    authProviders: [
      {
        authorizationType: appsync.AppSyncAuthorizationType.USER_POOL,
        cognitoConfig: { userPool },
      },
      { authorizationType: appsync.AppSyncAuthorizationType.IAM },
    ],
    connectionAuthModeTypes: [appsync.AppSyncAuthorizationType.USER_POOL],
    defaultPublishAuthModeTypes: [appsync.AppSyncAuthorizationType.IAM],
    defaultSubscribeAuthModeTypes: [appsync.AppSyncAuthorizationType.USER_POOL],
  },
});

eventApi.addChannelNamespace('dialog');

配信(publish)は IAM 認証、購読(subscribe)は Cognito User Pool 認証です。これによりエージェントだけが配信でき、認証済みユーザーだけが購読できます

チャネルは /dialog/{jobId} の形式で、ジョブごとに分離しています。必要に応じて購読時に Lambda ハンドラーを挟んで認可チェックを行うこともできます。

AgentCore Runtime への IAM 権限付与

// AgentCoreStack.ts
// Event API への publish 権限
eventApi.grantPublish(this.runtime);

AgentCore Runtime のロールに appsync:EventPublish 権限を付与します。CDK の grantPublish() を使えば 1 行で済みます。


エージェント側:SigV4 で Event API に publish

# agent.py
from botocore.auth import SigV4Auth
from botocore.awsrequest import AWSRequest

def _publish_event(channel: str, events: list[dict]) -> None:
    """AppSync Event API にイベントを publish する(SigV4 認証)"""
    credentials = boto3.Session().get_credentials().get_frozen_credentials()
    payload = json.dumps({
        "channel": channel,
        "events": [json.dumps(e) for e in events],
    }).encode("utf-8")

    # SigV4 署名を生成
    request = AWSRequest(
        method="POST",
        url=EVENT_API_ENDPOINT,
        data=payload,
        headers={"Content-Type": "application/json"},
    )
    SigV4Auth(credentials, "appsync", AWS_REGION).add_auth(request)

    req = urllib.request.Request(
        EVENT_API_ENDPOINT,
        data=payload,
        headers=dict(request.headers),
        method="POST",
    )
    urllib.request.urlopen(req, timeout=10)

botocoreSigV4Auth で署名を付与しています。外部ライブラリは不要で、boto3 に同梱されているモジュールだけで完結します。Event API の HTTP エンドポイントは https://{host}/event です。

claude-agent-sdk のストリーミングループから呼ぶ

async for message in query(prompt=_prompt_stream(), options=options):
    if isinstance(message, AssistantMessage):
        for block in message.content:
            if hasattr(block, "text"):
                # バックグラウンドスレッドでストリーミング配信
                threading.Thread(
                    target=lambda jid=job_id, txt=block.text: _publish_event(
                        f"/dialog/{jid}",
                        [{"type": "stream", "text": txt}]
                    ),
                    daemon=True,
                ).start()

Claude Code がテキストを出力するたびに _publish_event() をバックグラウンドスレッドで呼びます。threading.Thread で非同期にすることで、HTTP POST のレイテンシが Claude Code の処理をブロックしないようにしています(詳細はハマりポイント 3 で後述)。


フロントエンド側:WebSocket で購読

// EventApiClient.ts

// 接続時:サブプロトコルで認証
const httpHost = REALTIME_ENDPOINT
  .replace("wss://", "").replace("/event/realtime", "")
  .replace("appsync-realtime-api", "appsync-api");

const authProtocol = "header-" + btoa(
  JSON.stringify({ Authorization: idToken, host: httpHost })
).replace(/\+/g, "-").replace(/\//g, "_").replace(/=+$/, "");

this.ws = new WebSocket(REALTIME_ENDPOINT, [
  "aws-appsync-event-ws",
  authProtocol,
]);

// connection_init を送信
this.ws.onopen = () => {
  this.ws.send(JSON.stringify({ type: "connection_init" }));
};

// 購読時:authorization オブジェクトが必要
this.ws.send(JSON.stringify({
  type: "subscribe",
  id: crypto.randomUUID(),
  channel: `/dialog/${jobId}`,
  authorization: {
    Authorization: idToken,
    host: httpHost,
  },
}));

// データ受信
this.ws.onmessage = (event) => {
  const data = JSON.parse(event.data);
  if (data.type === "data") {
    const payload = JSON.parse(data.event);
    if (payload.type === "stream") {
      onMessage(payload.text);
    }
  }
};

注意点が 2 つあります。

  1. Base64 は URL-safe にする必要があります+-/_、末尾の = を除去します。通常の btoa() だけでは WebSocket サブプロトコルとして無効になります。
  2. subscribe メッセージには authorization オブジェクトが必要です。接続時のサブプロトコルとは別に、チャネル購読時にも認証情報を渡します。

ハマりポイント 1:WebSocket の認証ヘッダー

AppSync Event API の WebSocket 接続時に、Cognito の ID トークンを渡す方法に苦労しました。

通常の HTTP リクエストなら Authorization ヘッダーを付ければ済みますが、ブラウザの WebSocket API はカスタムヘッダーを送れません。AppSync Event API は WebSocket のサブプロトコルとして認証情報を渡す仕様になっています。

// ❌ これでは認証情報を渡せない
new WebSocket(url, { headers: { Authorization: token } });

// ✅ サブプロトコルとして URL-safe Base64 エンコードしたヘッダーを渡す
const encoded = btoa(JSON.stringify({
  host: EVENT_API_HOST,
  Authorization: idToken,
})).replace(/\+/g, "-").replace(/\//g, "_").replace(/=+$/, "");

new WebSocket(url, ["aws-appsync-event-ws", `header-${encoded}`]);

通常の btoa() だけでは WebSocket サブプロトコルとして無効になります。+/= はサブプロトコル名に使えない文字のため、URL-safe Base64 への変換が必須です。この仕様は通常の AppSync GraphQL Subscription(Amplify が自動で処理する)とは全く異なるため見落としやすいポイントです。


ハマりポイント 2:Event API のホスト名は GraphQL API とは別

AppSync Event API のエンドポイントは、通常の GraphQL API とは別のドメインです。

GraphQL API:  https://xxxxx.appsync-api.ap-northeast-1.amazonaws.com/graphql
Event API:    https://xxxxx.appsync-eventapi.ap-northeast-1.amazonaws.com/event

最初は GraphQL API のドメインに Event API のリクエストを送っていたため 404 が返り続けました。CDK で EventApi を作成すると別のドメインが払い出されるため、SSM Parameter Store 経由でフロントエンドとエージェントの両方に渡すようにしています。

// ApiStack.ts
new ssm.StringParameter(this, 'EventApiEndpoint', {
  parameterName: `/${props.stage}/appsync/event-api-endpoint`,
  stringValue: this.eventApi.httpDns,  // ← GraphQL API の dns とは別
});

ハマりポイント 3:ストリーミング配信は必ずバックグラウンドスレッドで

_publish_event() は HTTP POST で AppSync に送信するため、数十ミリ秒のレイテンシがあります。これを Claude Code のメッセージループ内で同期的に呼ぶと、Claude Code の処理自体が遅くなります

# ❌ 同期的に呼ぶと Claude Code の処理がブロックされる
for block in message.content:
    _publish_event(f"/dialog/{job_id}", [{"type": "stream", "text": block.text}])

# ✅ バックグラウンドスレッドで非同期に送信
for block in message.content:
    threading.Thread(
        target=lambda jid=job_id, txt=block.text: _publish_event(
            f"/dialog/{jid}", [{"type": "stream", "text": txt}]
        ),
        daemon=True,
    ).start()

daemon=True にすることで、メインプロセスの終了時にスレッドも自動的に終了します。配信が数十ミリ秒遅れても、ストリーミング表示においてはほぼ気になりません。

ただし、バックグラウンドスレッドで配信する場合、完了イベント(complete)を送る前に全スレッドの完了を待つ必要があります。そうしないと complete が TextBlock より先にクライアントに届き、クライアントが切断してしまいます。

publish_threads: list[threading.Thread] = []

for block in message.content:
    t = threading.Thread(
        target=lambda jid=job_id, txt=block.text: _publish_event(...),
        daemon=True,
    )
    publish_threads.append(t)
    t.start()

# 全 publish の完了を待ってから complete を送る
for t in publish_threads:
    t.join(timeout=10)

_publish_event(f"/dialog/{job_id}", [{"type": "complete", ...}])

まとめ

AppSync Event API を使って AgentCore 上の Claude Code の出力をリアルタイムにブラウザへ配信する実装を整理します。

項目 内容
配信方式 AppSync Event API(HTTP POST → WebSocket)
チャネル設計 /dialog/{jobId} でジョブごとに分離
認証(配信側) IAM (SigV4) — エージェントのみ publish 可能
認証(購読側) Cognito User Pool — 購読時に Lambda で認可チェック
配信タイミング claude-agent-sdkAssistantMessage.TextBlock ごと
非同期化 threading.Thread でメインループをブロックしない

Event API を選んだ理由は「チャネルに POST するだけで WebSocket クライアントに即座に届く」というシンプルさです。GraphQL のスキーマ定義もリゾルバーも不要で、認証・認可は AppSync 側が管理してくれるため、エージェントの実装はテキストを HTTP POST するだけで済みます。

さらに、Event API はイベントハンドラーを通じてログの永続化や外部通知といった拡張も可能なため、今後ストリーミングテキストの保存や通知連携が必要になった場合にも、同じ配信経路の上に機能を追加できます。

検証に作成したサンプルコードはこちら

6
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
6
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?