検証バージョン:
@cloudflare/ai-chat 0.8.6/agents 0.16.2/@assistant-ui/react-ai-sdk 1.3.37
内部実装を引用しているため、バージョンが変わると挙動が変わる可能性があります。
はじめに
Cloudflare Workers + Durable Objects + AI SDK で ChatGPT クローンを作っていたときのことです。会話のブランチ機能(メッセージを編集して別の返答を試す機能) を実装しようとしました。
ChatGPT や Claude でもおなじみの、こういうやつです。
[あなた] こんにちは
[AI] やあ!
[あなた] 今日の天気は? ← ここを編集して別の質問に変えると…
└─ ブランチA: [あなた] 今日の天気は? → [AI] 晴れです
└─ ブランチB: [あなた] 好きな食べ物は? → [AI] ラーメンです
実装してみると、AIChatAgent(Cloudflare が提供する Durable Object の基底クラス)がこのユースケースを想定していない ことが至るところで分かりました。その理由を技術的に解説します。
「SDK 全体が悪い」という話ではなく、AIChatAgent の会話永続化モデルの設計前提と、ブランチ機能の要件が3か所で噛み合わないという話です。
構成の概要
今回のスタックはこれ。
フロントエンド
@cloudflare/ai-chat (useAgentChat)
@assistant-ui/react-ai-sdk (useAISDKRuntime)
バックエンド
Cloudflare Workers
Durable Objects (AIChatAgent を継承)
D1 (SQLite、会話メタデータ管理)
AIChatAgent は Cloudflare が提供する Durable Object の基底クラスです。WebSocket でクライアントと通信しながら、メッセージを Durable Object 内蔵の SQLite に保存します。
問題 1: _deleteStaleRows が新規メッセージありの経路では機能しない
ブランチ機能の核心は「古いメッセージを捨てて、新しい文脈で AI に返答させる」ことです。
クライアント側では @assistant-ui の onEdit 実装が sliceMessagesUntil() を呼んでメッセージを切り詰めます。その後 sendMessage() で新しいメッセージを送ります。このとき AI SDK の WebSocket トランスポートは切り詰めたメッセージ列を _deleteStaleRows: true と一緒にサーバーへ送ります。
期待していた動作: サーバー(Durable Object)が古いメッセージを削除してくれる。
実際の動作: 削除されない。
AIChatAgent の persistMessages を読むと、その理由が分かります。
// 以下は agents/dist/chat/index.js の該当箇所の要約(v0.16.2 時点)
async persistMessages(messages, excludeBroadcastIds, options) {
const mergedMessages = reconcileMessages(messages, this.messages, ...);
if (options?._deleteStaleRows) {
const serverIds = new Set(this.messages.map((m) => m.id));
// ← ここが問題
if (mergedMessages.every((m) => serverIds.has(m.id))) {
// 全メッセージが「既にサーバーに存在する」ときだけ古い行を削除する
const staleIds = ...filter((id) => !keepIds.has(id));
this._deleteMessagesByIds(staleIds);
}
// 新しいメッセージID(まだサーバーに存在しない)が1つでもあれば
// このブロック全体がスキップされる
}
}
_deleteStaleRows の削除処理は「送られてきた全メッセージが既にサーバー側に存在する」ときだけ実行されます。
この guard は意図的な設計です。「クライアントが新しいメッセージを送りながら、既存データを誤消去してしまう」事故を防ぐためのセーフティです。
ブランチ作成時は ユーザーの新しいメッセージ(新規 ID)が必ず含まれる ので、このガードが常に弾かれます。結果として Durable Object の SQLite には全ブランチのメッセージが永遠に蓄積していきます。
問題 2: 初期復元・同期の両経路がブランチを知らない
Durable Object に全ブランチのメッセージが溜まっているので、ページをリロードすると全メッセージがクライアントに届きます。
DO の SQLite に保存されているメッセージ(例):
M1(main), AI1(main), M2(main), AI2(main) ← main ブランチ
M3(branchA), AI3(branchA) ← branch A
M4(branchB), AI4(branchB) ← branch B
M5(branchC), AI5(branchC) ← branch C
↓ ページロード後に全部届く
クライアントは全 10 メッセージを表示してしまう
届くルートは以下の2つです。
- HTTP GET
/get-messages—useAgentChatが初期マウント時に叩く REST エンドポイント。DO の SQLite 全件をそのまま返す。 - WebSocket
cf_agent_chat_messages— WebSocket 接続後のメッセージ同期。サーバーがpersistMessagesを呼ぶたびにブロードキャストされる。
どちらも DO の SQLite をそのまま流すので、ブランチという概念を持っていません。
これを解決するには、クライアント側でブランチ系譜に基づいたフィルタリングを自前実装するしかありませんでした。
// サーバー側の collectBranchMessageIds と同じロジックをクライアントで再実装
// D1 から取得した branches・mappings を使って可視メッセージを計算する
function computeVisibleIds(
messages: { id: string }[],
targetBranchId: string,
branches: Branch[],
mappings: MessageMapping[],
): Set<string> {
const messageOrder = new Map(messages.map((m, i) => [m.id, i]));
const branchMap = new Map(branches.map((b) => [b.id, b]));
// ...ブランチ系譜を遡ってメッセージIDセットを構築
}
ページロード時の初期フィルタです。
// 初回ロード時のみ実行。localStorage からブランチIDを復元し D1 で可視IDを計算する
useEffect(() => {
if (hasStartedFilter.current || chat.messages.length === 0) return;
hasStartedFilter.current = true;
allOriginalIds.current = new Set(chat.messages.map((m) => m.id));
const savedBranchId =
localStorage.getItem(`branch_${conversationId}`) ?? "main";
if (savedBranchId === "main") return;
fetchBranchData(savedBranchId).then(({ branches, mappings }) => {
const visibleIds = computeVisibleIds(
chat.messages,
savedBranchId,
branches,
mappings,
);
chat.setMessages(chat.messages.filter((m) => visibleIds.has(m.id)));
});
}, [chat.messages.length]);
さらに、AI 応答後などサーバーが全メッセージをブロードキャストするたびに再フィルタが必要です。
// cf_agent_chat_messages 受信ごとに動作。新規メッセージは常に可視に追加する
useEffect(() => {
if (!hasFilteredOnLoad.current) return;
const msgs = chat.messages;
for (const m of msgs) {
if (!allOriginalIds.current.has(m.id)) {
visibleIdsRef.current.add(m.id); // 今セッションの新規メッセージは常に表示
}
}
const filtered = msgs.filter((m) => visibleIdsRef.current.has(m.id));
if (filtered.length < msgs.length) {
chat.setMessages(filtered);
}
}, [chat.messages]);
なぜサーバー側フィルタを採らなかったか。DO の onMessage をオーバーライドして WebSocket 接続の branch_id パラメータでフィルタする方法もあります。ただしこれは AIChatAgent の内部ブロードキャストロジックへ深く踏み込む必要があり、SDK のアップデートで壊れるリスクが高いです。クライアント側フィルタを採れば DO 実装は汚れないものの、「サーバーから全件届いてからフィルタ」というやや非効率な経路をたどります。
問題 3: setMessages のラップが必要だが SDK が邪魔をする
@assistant-ui/react-ai-sdk の onEdit(メッセージ編集)は内部で以下を実行します。
// useAISDKRuntime.ts の onEdit 実装(v1.3.37 時点の実コード・要約)
onEdit: async (message) => {
// parentId = 編集するメッセージの「前」のメッセージID
chatHelpers.setMessages((current) =>
sliceMessagesUntil(current, message.parentId),
);
await chatHelpers.sendMessage(createMessage, { ... });
},
ブランチを作成するタイミングは「setMessages が短縮化された瞬間」です。この関数呼び出しを横から検知する仕組みが必要です。
しかし chatHelpers.setMessages を外側でラップしても、SDK 内部の WebSocket ハンドラは元の関数を直接参照しています。そのため、サーバーからのメッセージ更新はラッパーを経由しません。
// @cloudflare/ai-chat/dist/react.js 内部の概念的な構造(実コードを元に要約)
const { setMessages } = useChatHelpers; // 元の setMessages への参照
// WebSocket ハンドラ(クローズアップした変数を直接使う)
case "cf_agent_chat_messages":
setMessages(next); // ← ラッパーを経由しない
// 外部 API として公開
return { setMessages, ... };
整理するとこうなります。
-
onEdit→ ラッパーを経由する(fork 検知可能) - WebSocket からのメッセージ更新 → ラッパーを経由しない(直接適用される)
これが問題2でリアクティブな useEffect フィルタリングが必要になった根本理由です。
問題 4: 後付けで implicit main を採用したことで生じた移行の罠
これは SDK の制約ではなく、自分の設計判断に起因する問題です。
D1 に branches テーブルを作ってブランチを管理するとき、main(最初のブランチ)だけは DB に行を作らず暗黙的に扱うことにしました。明示的に "main" を INSERT するタイミングがないため、こう設計しました。
-- branches テーブル(main の行は存在しない)
id | parent_branch_id | fork_message_id
branchA-xxx | main | msg-ai-123
branchB-xxx | main | msg-ai-456
ところがメッセージを branch にタグ付けする処理を書くとき、ブランチ機能追加前に送ったメッセージは branch_messages テーブルに存在しない という問題が発生しました。
最初は「タグ未登録のメッセージを全部現在のブランチにタグ付けする」と書きましたが、これだと別ブランチの孤立したメッセージ(onFinish が失敗して保存されなかったもの)が混入します。
逆に「最後の1件だけタグ付け」にすると main ブランチではコンテキストが全消えになります。過去メッセージが branch_messages に登録されていないため、collectBranchMessageIds("main") が空を返すからです。
結局、ブランチごとに処理を分けるしかありませんでした。実装の要約は以下のとおりです。
// 以下は実装の要約。implicit main を採用した場合の移行対策
private async tagNewMessagesWithBranch(branchId: string): Promise<void> {
if (branchId === "main") {
// main: 未タグのメッセージを全部 main に紐付け
// ブランチ機能追加前の既存メッセージの後付けマイグレーションも兼ねる
const tagged = await this.getTaggedMessageIds();
const toTag = this.messages.filter((m) => !tagged.has(m.id));
for (const msg of toTag) await this.tagMessage(msg.id, "main");
return;
}
// 非 main: 最後の1件のみ(他ブランチ由来の孤立メッセージ混入を防ぐ)
const lastMsg = this.messages[this.messages.length - 1];
if (!await this.isTagged(lastMsg.id)) {
await this.tagMessage(lastMsg.id, branchId);
}
}
後付けでブランチ機能を追加するなら、main を明示的に DB に持つか、ゼロから branch_messages テーブルでタグ付けを始めましょう。implicit main は運用が始まった後に変更するコストが高いです。
そもそもなぜこんなに難しいのか
AIChatAgent の設計上の前提をまとめると以下のとおりです。
| 前提 | 説明 |
|---|---|
| メッセージ履歴は単一系列 | 全メッセージを 1 つの SQLite テーブルに順番に格納する |
| 復元時は全履歴を届ける | HTTP GET と WebSocket の両経路でクライアントに全メッセージを送って状態を復元する |
| 削除は安全優先 | 新規 ID が含まれるリクエストでは既存データを消さない(誤消去防止の guard) |
ブランチ機能はこのどれとも相性が悪いです。
- 「複数の系列が並立する」→ 単一系列前提と矛盾
- 「ブランチごとに見えるメッセージが違う」→ 全履歴送付と矛盾
- 「分岐後も古い系列は残したい」→ 安全優先の削除 guard と矛盾
設計段階で分かっていたら
SDK でブランチ機能を実装する前に確認すべきことをまとめます。
- メッセージ履歴をどこに持つか(DO か、DB か、両方か)
- 再接続・リロード時にどうリストアするか
- 「削除」の単位は会話単位か、メッセージ単位か
- root(main)ブランチを明示的に管理するか、暗黙にするか
設計段階でブランチ有りを前提にしていれば、こういう実装が選べました。
-
DO ではなく D1 に全メッセージを保存し、
branch_idで SQL フィルタ- DO は WebSocket のストリーミングセッション管理専用にする
- 永続化の正本は D1 の message DAG に寄せることで、ブランチ分離が SQL クエリ1本で済む
-
初回から
branch_messagesテーブルを使ってタグ付け(後付け移行問題が消える) -
WebSocket 接続時に
branch_idパラメータを渡してサーバー側フィルタリング- HTTP GET
/get-messagesもパラメータ対応させれば、クライアント側の複雑なフィルタロジックが不要になる
- HTTP GET
-
あるいは
1ブランチ = 1DOという割り切りも選択肢の1つ- ブランチごとに DO インスタンスを分ける。会話の fork 時に DO を複製するコストが増えるが、設計は最もシンプルになる
後付けで機能を追加するときは、既存の SDK の設計前提をまず読み解くところから始めましょう。