4
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

MCPプロトコル:SSE対Streamable HTTP比較

Posted at

Cover Image

サーバーサイドイベント vs ストリーマブルHTTP:TypeScript SDKから学ぶリアルタイム通信の進化 🚀

Server-Sent Events(SSE)は、もうおなじみの技術でしょう。サーバーからクライアントへデータをプッシュする、シンプルで優れた仕組みです。しかし、Model Context Protocol(MCP)SDKのような最新フレームワークを深掘りすると、「Streamable HTTP」という別の用語に出会います。一見SSEに似ているものの、完全に同一ではありません。これは混乱を招きます。両者は同じものなのか?どちらが優れているのか?なぜ2つの方式が存在するのか?

本記事では、この混乱を徹底的に解消します。お気に入りの飲み物を用意してください。長く、しかし有益な旅になるでしょう!実際のTypeScript SDKを通じて、レガシーなHTTP+SSEトランスポートとモダンなStreamable HTTPトランスポートの両方をコードレベルで徹底解剖していきます。この長編記事を読み終える頃には、両者の違いを理解するだけでなく、一方から他方への優雅なエンジニアリング進化を心から評価できるようになるはずです。

💡 簡単なアナロジー:2台の電話機 vs 1台のスマートフォン

コードに入る前に、核心的な違いを捉えるシンプルなアナロジーから始めましょう。

レガシーHTTP+SSEは、2台の別々の旧式電話を使うようなもの

固定電話(GET)があり、これは受信専用です。サーバーはこの回線を開いたままにして、いつでも好きな時にあなたに話しかけられます。

公衆電話(POST)があり、これは発信専用です。サーバーに何か伝えたいたびに、公衆電話まで行き、電話をかけ、用件を伝え、切る必要があります。

重要なのは、サーバーが固定電話で最初に行うことが、公衆電話のアドレスをあなたに伝えることです。

モダンなStreamable HTTPは、現代のスマートフォン通話のようなもの

1回の通話(POST)を行います。

この1回の通話で、あなたはサーバーに話しかけ(リクエストを送信)、サーバーも継続的にあなたに話し返すことができます(レスポンスをストリーミング)。メインの会話を中断せずに「テキストメッセージ」(進捗更新)を送ることも可能です。

別途「聞き専用」チャネル(GET)を開くオプションもあります。これはサーバーをスピーカーフォンに切り替えてバックグラウンドでアナウンスを聞くようなものですが、双方向通信には必須ではありません。

このアナロジーの本質はこうです:レガシープロトコルは双方向通信を実現するために2つの別々の非対称チャネルを必要としますが、モダンなプロトコルはこれを単一の、より強力なHTTPトランザクションに統合します。

では、コードでそれを証明しましょう。

究極の深掘り:機能ごとのプロトコル対決 🔧

ここからは研究者モードに入ります。リアルタイム通信の核心概念を分解し、各プロトコルがそれらをどう処理するかを、SDKから直接抜粋したコードスニペットで裏付けながら比較します。

接続と通信モデル

これが最も根本的な違いであり、アーキテクチャ変更の大半の源泉です。

属性 レガシーHTTP+SSE モダンStreamable HTTP
主要チャネル 2つの別々のチャネル:
1. サーバーからクライアントへのメッセージ用の永続的なGET
2. クライアントからサーバーへのメッセージ用の一時的なPOST
統合ハイブリッドチャネル:
単一のPOSTでクライアントのリクエストとサーバーからクライアントへのストリーミングレスポンスの両方を処理。サーバーからの一方的なイベント用の別個のGETチャネルはオプション。
ハンドシェイク 必須で非対称。クライアントのGET接続後、サーバーからカスタムendpointイベントを待ち、POSTの送信先を学ぶ必要がある。 暗黙的で柔軟。クライアントはinitialize POSTを送信。サーバーのレスポンス(202 Acceptedまたは200 OK)が次のステップを指示。カスタムハンドシェイクイベントは不要。
柔軟性 硬直的。2チャネルモデルが唯一の動作方式。 高度に柔軟。サーバーはリクエストの性質に応じて、POSTに対して単一のJSONオブジェクト(クラシックRPC)またはフルイベントストリームで応答可能。

レガシーSSEの内部構造

レガシーシステム全体は、endpointイベントハンドシェイクに依存しています。クライアントは接続し、サーバーが「どこに返答すべきか」を伝えてくるまで、事実上「盲目的に聞いている」状態です。

クライアント側(client/sse.ts):指示を待つ

// ソース: ./guides/typescript-sdk/src/client/sse.ts

this._eventSource.addEventListener("endpoint", (event: Event) => {
  // サーバーがPOSTリクエスト用URLをこのイベントのデータで送信
  this._endpoint = new URL((event as MessageEvent).data, this._url);
  // ...
  // この時点で接続が完全に確立されたと見なされる
  resolve();
});

サーバー側(server/sse.ts):指示を与える

// ソース: ./guides/typescript-sdk/src/server/sse.ts

// サーバーの最初の行動はカスタムendpointイベントを書き込むこと
this.res.write(
  `event: endpoint\ndata: /messages?sessionId=${this._sessionId}\n\n`,
);

この2ステップのダンスは巧妙ですが、初期接続に複雑さとレイテンシを追加します。

Streamable HTTPの内部構造

Streamable HTTPはこれをエレガントに排除します。クライアントがPOSTリクエストを送信し、サーバーのレスポンスが全体のフローを指示します。client/streamableHttp.tsのクライアントsend()メソッドを見てください。適応的レスポンス処理の傑作です。

クライアント側(client/streamableHttp.ts):サーバーのレスポンスを解釈

// ソース: ./guides/typescript-sdk/src/client/streamableHttp.ts

const contentType = response.headers.get("content-type");

if (contentType?.includes("text/event-stream")) {
  // サーバーがこの同じ接続でレスポンスをストリーミング中
  this._handleSseStream(response.body, { onresumptiontoken }, false);
} else if (contentType?.includes("application/json")) {
  // サーバーがシンプルな単一JSONレスポンスを送信
  const data = await response.json();
  // ... JSONデータを処理
}

これはHTTPのはるかに自然で強力な使い方です。クライアントが質問し、サーバーは短い答え(JSON)または長く詳細なストーリー(ストリーム)を選択できます。

セッションと状態管理

クライアントとサーバーは、誰と話しているかをどう追跡するのでしょうか?

属性 レガシーHTTP+SSE モダンStreamable HTTP
セッション開始 セッションIDはサーバーが作成し、endpointイベントのデータURL内のクエリパラメータとして返送される セッションIDはサーバーが作成し、専用HTTPヘッダー(mcp-session-id)で返送される
セッション追跡(クライアント) クライアントはendpointイベントからURLをパースし、すべての後続POSTにsessionIdクエリパラメータを付加する必要がある クライアントは最初のレスポンスからmcp-session-idヘッダーを読み取り、保存し、_commonHeaders()経由ですべての後続リクエストにヘッダーとして追加するだけ
セッション追跡(サーバー) アプリケーションレベルのマップが必要。サーバーコードは、受信POSTを正しいクライアントの開いているGETストリームに関連付けるためにマップ(例:transports[sessionId])を維持する必要がある トランスポートによってよりクリーンに処理される。simpleStreamableHttp.tsに見られるように、トランスポートインスタンスをセッションに直接関連付けられ、アプリケーションロジックが簡素化される

ここでの重要なポイントは、レガシープロトコルがアプリケーション開発者に、セッションIDをトランスポートインスタンスにマッピングしてセッション状態を正しく管理する負担を大きく課すことです。モダンなプロトコルは、まさにこの種のメタデータ用に設計された標準HTTPヘッダーを使用することで、これをよりクリーンにします。

再開可能性と信頼性 ⚠️

モバイルネットワークがリクエスト中にドロップしたらどうなるでしょうか?ここでモダンなプロトコルが真に輝きます。

属性 レガシーHTTP+SSE モダンStreamable HTTP
接続再開 サポートされていない。GETストリームがドロップすると接続が失われる。クライアントは最初からやり直す必要がある ファーストクラス機能。これがプロトコル存在の主要理由の1つ
メカニズム N/A トークンベース:
1. サーバーが各SSEイベントにid:フィールドを送信(再開トークン)
2. クライアントが最後に見たトークンを永続化
3. 再接続時、クライアントがlast-event-id HTTPヘッダーを送信
4. サーバーがEventStoreを使用して見逃したメッセージを再生
サーバー側要件 N/A メッセージ履歴を永続化して再生するための、サーバー上のプラグ可能なEventStoreコンポーネントが必要

Streamable HTTPの再開フロー(芸術作品です)

これがキラー機能です。長時間実行されるAIタスクが99個の進捗更新を送信したとします。Wi-Fiが一瞬途切れます。レガシープロトコルでは、すべてが失われます。Streamable HTTPでは、次のようになります:

トークンのキャプチャ(client/streamableHttp.ts):クライアントはイベントを受け取るたびにidフィールドをチェックして保存

// ソース: ./guides/typescript-sdk/src/client/streamableHttp.ts

// _handleSseStream内で
if (event.id) {
  lastEventId = event.id;
  onresumptiontoken?.(event.id); // これを永続化!
}

切断の検出:ストリーム処理ループ内のネットワークエラーがキャッチされ、再接続ロジックがトリガーされる

再接続の試行:クライアントは(指数バックオフで)待機し、新しいGETリクエストを行うが、今回は特別なヘッダーを追加

// ソース: ./guides/typescript-sdk/src/client/streamableHttp.ts

// _startOrAuthSse内で
if (resumptionToken) {
  headers.set("last-event-id", resumptionToken);
}

サーバー側リプレイ(server/streamableHttp.ts):サーバーはこのヘッダーを見て、新しいストリームを開始する代わりに、EventStoreにクエリして提供されたトークン以降のすべてのメッセージを見つけ、通常の操作を再開する前に新しい接続でそれらを送信

これにより、Streamable HTTPで構築されたアプリケーションは、一時的なネットワーク問題に対して驚くほど回復力があります。

主要機能とユースケース

それぞれのプロトコルは、どのようなアプリケーションに最適なのでしょうか?

属性 レガシーHTTP+SSE モダンStreamable HTTP
進捗更新 不格好。サーバーはGETストリームで通知を送信できるが、タスクを開始したPOSTリクエストに直接紐付いていない シームレス。長時間実行されるツール呼び出しがPOST経由で行われる。サーバーは同じPOSTのレスポンスボディで進捗更新をストリーミングでき、すべてを単一トランザクション内に整然とスコープ化
インタラクティブな引き出し 可能だが不自然。サーバーはGETストリームでリクエストを送信。クライアントは新しいPOSTで応答。サーバーはそのPOSTを元のリクエストと関連付ける必要がある 自然。これがコアユースケース。サーバーは任意の時点でオプションのスタンドアロンGETストリームでelicitInputリクエストを送信でき、真の双方向会話型AIを実現
理想的なユースケース シンプルな一方向サーバーからクライアントへの通知システム(例:「新しい記事が投稿されました!」) 複雑でステートフルなインタラクティブアプリケーション(例:AIエージェント、長時間実行データ処理ツール、リアルタイムコラボレーションダッシュボード)

ここまでハイレベルな違いを見てきました。さらに深く、モダンなStreamable HTTPプロトコルに潜り込んで、このエレガンスがコードでどう実現されているかを見ていきましょう。

パート1:全体像 - なぜこのプロトコルが存在するのか

哲学:単なるリクエストとレスポンス以上のもの 📝

Streamable HTTPプロトコルの中核には、標準的なHTTPメソッドを型破りな方法で使用する、巧妙なデュアルチャネルモデルが構築されています。レストランの通信システムのように考えてみてください:

コマンドチャネル(HTTP POST 🗣️):これはキッチンへの直通ラインです。注文(リクエストまたは通知)を行うために使用します。ここでの魔法は、ウェイター(サーバー)が注文の準備中に同じライン上であなたに話しかけられることです。進捗更新(「シェフが今ステーキを焼いています!」)や部分的な結果(「待っている間に前菜をどうぞ」)を提供できます。これはすべて、単一のPOSTリクエストへのレスポンス内で処理され、レスポンス自体がイベントのストリームになる可能性があります。

アナウンスチャネル(HTTP GET 📢):これはレストランのPAシステムです。一度チューニング(長時間のGETリクエストを行う)すれば、レストランからの一般的なアナウンスを聞くことができます(「今夜のスペシャルは...」)。これらは、あなたが行った特定の注文に紐付かない、サーバー主導の一方的なイベントです。

この設計により、両方の世界の最良を得られます:コマンドのための馴染みのある直接的なPOSTの性質と、非同期更新のためのGETベースのServer-Sent Events(SSE)ストリームの永続的で低レイテンシな性質です。このシステム全体は、client/streamableHttp.tsserver/streamableHttp.tsの実装によって生き生きとしています。

作戦の頭脳:抽象Protocolクラス 🧠

HTTP部分に入る前に、コアロジック層であるshared/protocol.tsの抽象Protocolクラスを理解する必要があります。HTTPトランスポート層は配管(パイプとワイヤー)だと考えてください。しかし、このProtocolクラスは、それらを通じて何が流れるかを決定する脳です。JSON-RPC 2.0フレーミング、リクエストライフサイクル、信頼性の細部を処理します。

リクエストとレスポンスのマッチング方法

アプリケーションコードがclient.request(...)を呼び出すとき、特に複数のリクエストが同時に発生している場合、どのレスポンスがそれに属するかをどう知るのでしょうか?

すべては一意のIDから始まります。Protocolクラスはカウンター_requestMessageIdを維持し、送信するすべてのリクエストに新しいIDを割り当てます。次に、Promiseを作成し、そのresolvereject関数を、メッセージIDをキーとして_responseHandlersというMapに巧妙に保存します。

これがその重要なコード部分です。クライアントが守るつもりの約束をする瞬間です。

// ソース: ./guides/typescript-sdk/src/shared/protocol.ts

// クライアントは罠を仕掛けています。「`messageId`のレスポンスが到着したら、
// この関数を実行してPromiseをresolveまたはrejectしてください」と言っているのです。
this._responseHandlers.set(messageId, (response) => {
  // まず、リクエストがこちら側ですでにキャンセルされたかチェック
  if (options?.signal?.aborted) {
    return;
  }

  // レスポンスがエラーの場合、Promiseをreject
  if (response instanceof Error) {
    return reject(response);
  }

  // 成功の場合、期待されるスキーマに対して結果をパースしてresolve!
  try {
    const result = resultSchema.parse(response.result);
    resolve(result);
  } catch (error) {
    // サーバーのレスポンスが期待する形状と一致しない場合、それもエラー
    reject(error);
  }
});

サーバーからメッセージが到着すると、トランスポートのonmessageハンドラがそれをProtocolクラスに渡し、トリアージナースとして機能します。メッセージにresultまたはerrorフィールドがある場合、それがレスポンスであることを知り、_onresponseを呼び出します。この関数は罠のもう半分です:レスポンスからIDを取得し、_responseHandlers内の対応するハンドラを見つけ、それを発動させてPromiseを履行します。

実行中のキャンセルを優雅に処理

ユーザーが我慢できなくなって長時間実行の操作をキャンセルしたい場合はどうなるでしょうか?プロトコルには、標準のAbortSignalを使用してこれをクリーンに処理する方法があります。

  1. クライアントアプリケーションがAbortSignalをトリガー
  2. request()メソッドがこれをキャッチし、Promiseをローカルでrejectし、重要なことに、notifications/cancelledメッセージをサーバーに送信
  3. サーバーのProtocolインスタンスには、この通知専用に事前登録されたハンドラがあり、タスクのAbortController(リクエストが最初に到着したときに保存したもの)を検索して.abort()を呼び出し、サーバー側コードに作業を停止するよう通知

タイムアウトで長時間実行タスクを維持

リクエストが永遠にハングするのを防ぐため、Protocolクラスにはスマートなタイムアウトシステムがあります。リクエストが行われると、タイマーを開始します。しかし、真の魔法はresetTimeoutOnProgressオプションにあります。長時間のAIタスクの場合、時間がかかっているだけでタイムアウトさせたくありません。このオプションがtrueの場合、サーバーが進捗通知を送信するたびに、クライアントはタイムアウトタイマーをリセットします。これにより、サーバーが生存の兆候を示している限り、クライアントは辛抱強く待ちます。

// ソース: ./guides/typescript-sdk/src/shared/protocol.ts

// このメソッドは進捗通知が到着したときに呼び出される
private _resetTimeout(messageId: number): boolean {
  const info = this._timeoutInfo.get(messageId);
  if (!info) return false;

  // 永遠に延長できないよう`maxTotalTimeout`に対してもチェック
  const totalElapsed = Date.now() - info.startTime;
  if (info.maxTotalTimeout && totalElapsed >= info.maxTotalTimeout) {
    this._timeoutInfo.delete(messageId);
    throw new McpError(/* ... */);
  }

  // 古いタイマーをクリアして新しいものを開始!
  clearTimeout(info.timeoutId);
  info.timeoutId = setTimeout(info.onTimeout, info.timeout);
  return true;
}

パート2:クライアントの視点 - 接続の確立

では、client/streamableHttp.tsの具体的なクライアント実装に飛び込みましょう。

クライアントハンドシェイク:接続、初期化、認証

クライアントの完全な接続への旅は精密なダンスです:

  1. initialize POST:クライアントが最初に行うのは、initializeメッセージをPOSTすることです。これは、クライアントが自分が誰で何ができるかをサーバーに伝える正式なハンドシェイクです。

  2. 202 Acceptedトリガー:サーバーはHTTP 202 Acceptedで応答します。これがシグナルです!クライアントのsend()メソッドはこれを見て、すぐに第2チャネルを開く時だと知ります。

  3. 非同期GET:クライアントはすぐに_startOrAuthSse()を呼び出し、Accept: text/event-streamヘッダー付きの長時間GETリクエストを発射します。これは、クライアントがサーバーのPAシステムに耳を傾けるために開くものです。サーバーがこれをサポートしていない場合(405 Method Not Allowedを返す)、クライアントは優雅にそれなしで続行します。

  4. 認証フロー:サーバーが401 Unauthorizedで応答した場合、クライアントのauthProviderが起動します。トークンをリフレッシュしようとするか、認証情報がない場合はredirectToAuthorizationをトリガーし、ユーザーをログインに送ります。ユーザーが戻ったら、アプリケーションはfinishAuth()を呼び出してOAuth2フローを完了し、接続を再試行するために必要なトークンを取得します。

クライアントのゲートウェイ:send()メソッドの詳細分析

クライアントが送信するすべてのメッセージは、POST経由でsend()メソッドを通過します。クライアントの真の天才は、このPOSTへのレスポンスをどう解釈するかにあります。

  • ステータスが202 Acceptedの場合:これは「メッセージを受信しました、ありがとう」というシグナルです。メッセージがinitializeだった場合、これは上で見たようにSSE GETストリームを開始するキューです。

  • ステータスが200 OKでContent-Typeがapplication/jsonの場合:これはシンプルな同期スタイルのレスポンスです。クライアントはJSONをパースし、このトランザクションは完了です。

  • ステータスが200 OKでContent-Typeがtext/event-streamの場合:ここが本当にクールです。POSTリクエストのレスポンス自体がストリームです。クライアントはこのストリームを_handleSseStreamにパイプして、その特定リクエストの進捗更新と最終結果を処理します。

このロジックがクライアントの柔軟性の心臓部です。

// ソース: ./guides/typescript-sdk/src/client/streamableHttp.ts

// send()内のこのブロックは、サーバーのレスポンスに基づいて何をすべきかを決定
if (response.status === 202) {
  // サーバーが初期化を受け入れた場合...
  if (isInitializedNotification(message)) {
    // ...一般アナウンス(GET)チャネルを開く時です!
    this._startOrAuthSse({ resumptionToken: undefined }).catch(err => this.onerror?.(err));
  }
  return;
}

const contentType = response.headers.get("content-type");

if (hasRequests) {
  if (contentType?.includes("text/event-stream")) {
    // POSTレスポンスはストリーム!それに応じて処理
    this._handleSseStream(response.body, { onresumptiontoken }, false);
  } else if (contentType?.includes("application/json")) {
    // POSTレスポンスはシンプルなJSONオブジェクト。パース
    const data = await response.json();
    // ... データを処理 ...
  }
}

バイトからメッセージへ:_handleSseStreamでストリームをパース

このメソッドは、メインGETまたはストリーミングPOSTからのすべてのSSEストリーム用の指定パーサーです。美しく、モダンなストリーム処理パイプラインを設定します:

生のバイト(ReadableStream)→デコードされたテキスト(TextDecoderStream)→パースされたイベント(EventSourceParserStream)

次に、このパイプラインの終端から読み取り、event.data(JSONペイロード)を取得し、パースし、ルーティングのためにProtocol層のメインonmessageコールバックに戻します。シンプル、効率的、ノンブロッキングです。

// ソース: ./guides/typescript-sdk/src/client/streamableHttp.ts

// これはJavaScriptのモダンなストリーム処理の傑作
const reader = stream
  .pipeThrough(new TextDecoderStream())
  .pipeThrough(new EventSourceParserStream())
  .getReader();

カオスを乗り越える:セッション管理と再開可能性 🛡️

このプロトコルが真に輝きを放つのは、ステートフルな状態管理とネットワーク障害からの復旧機能においてです。

**セッション管理:**クライアントは最初のレスポンスからmcp-session-idヘッダーを取得して保存します。それ以降のすべてのリクエストにこのヘッダーを含めることで、サーバーに「また私です」と伝えます。

**接続の再開可能性:**これは耐障害性を実現するための重要なフローです。

**トークンの取得:**ストリーム処理中、SSEイベントにidフィールドがあれば、それが再開トークンです!クライアントはこれをlastEventIdとしてキャプチャし、onresumptiontokenコールバックを呼び出して、アプリケーションが安全な場所(例えばlocalStorage)に保存できるようにします。

**切断の検出:**ネットワークが切断されると、ストリームがエラーを返します。_handleSseStreamcatchブロックがトリガーされます。

**再接続のスケジュール:**諦める代わりに、catchブロックが_scheduleReconnectionを呼び出し、指数バックオフ遅延を使用して次の試行を計画します。

**再開の試行:**遅延後、_startOrAuthSseを再度呼び出しますが、今回は保存しておいたlastEventIdを渡します。

マジックヘッダーの送信:_startOrAuthSseは新しいGETリクエストを作成しますが、今度はトークンを含む特別なlast-event-idヘッダーを付けます。これにより、サーバーはクライアントがどこで中断したかを正確に把握し、見逃したメッセージを再生できます。

これは接続障害から回復するための完全なクローズドループシステムです。

パート3:サーバー側の物語 🔧

それでは視点を変えて、server/streamableHttp.tsのサーバー実装を見てみましょう。

サーバーの正面玄関:handleRequestとトランスポートライフサイクル

simpleStreamableHttp.tsのサンプルサーバーは、ステートフルな接続を管理するための美しいパターンを示しています。グローバルなtransportsマップを維持します。

リクエストが来ると、mcp-session-idヘッダーをチェックします。

IDがマップに存在する場合、そのセッション用の既存のStreamableHTTPServerTransportインスタンスを再利用します。状態が維持されます!

IDが存在せず、メッセージがinitializeの場合、新しいクライアントが接続していることを認識します。新しいトランスポートインスタンスを作成します。ポイントはonsessioninitializedコールバックです:新しいトランスポートがセッションIDを生成すると、このコールバックが発火し、新しいトランスポートをグローバルマップに保存します。

このロジックが、サーバーが複数の異なるクライアントセッションを同時に管理する方法の核心です。

// From: ./guides/typescript-sdk/src/examples/server/simpleStreamableHttp.ts

// サーバー例からのこのロジックが、ステートフルなセッション管理の鍵です。

if (sessionId && transports[sessionId]) {
  // 既存のセッションが見つかったので、そのトランスポートを再利用します。
  transport = transports[sessionId];
} else if (!sessionId && isInitializeRequest(req.body)) {
  // 新しいクライアントが初期化中です!
  transport = new StreamableHTTPServerTransport({
    sessionIdGenerator: () => randomUUID(),
    // このコールバックが魔法の接着剤です。新しいセッションIDをそのトランスポートインスタンスに結びつけます。
    onsessioninitialized: (newlyCreatedSessionId) => {
      console.log(`Session initialized with ID: ${newlyCreatedSessionId}`);
      transports[newlyCreatedSessionId] = transport;
    }
  });
  // ...
}

インテリジェントルーティング:サーバーのsend()メソッド

サーバーのsend()メソッドはクライアントのミラーイメージであり、送信メッセージを正しいチャネルにルーティングする責任があります。決定要因はrelatedRequestIdです。

ケース1:一般的なアナウンスメント。send()relatedRequestIdなしの通知に対して呼び出されると、サーバーはそれがサーバー主導の一般的なイベントであることを認識します。長期間存続するGETストリーム用のServerResponseオブジェクトを探し出し、そこにメッセージを書き込みます。

ケース2:特定のレスポンス。send()がレスポンスである(idを持つ)メッセージまたはrelatedRequestIdを持つメッセージに対して呼び出されると、サーバーはそれが特定のPOSTトランザクションに属していることを認識します。内部マッピング(_requestToStreamMapping)を使用して、元のPOSTに関連付けられた正確なServerResponseオブジェクトを見つけ、その専用ストリームにメッセージを書き込みます。

これにより、ツールAの進捗更新が誤ってツールBのレスポンスストリームに送信されることを防ぎます。

武器の選択:ストリーミングSSE vs 同期JSON

サーバーは、enableJsonResponseオプションによって制御される2つの方法で応答できます。

**ストリーミングSSE(デフォルト):**POSTが到着すると、サーバーは即座にtext/event-streamコンテンツタイプで200 OKを返します。接続は開かれたストリームとなり、サーバーは利用可能になった時点でイベントを送信できます。

同期JSON:enableJsonResponsetrueの場合、サーバーは待機します。すぐには応答を送信しません。リクエストバッチのすべての結果をメモリにバッファリングします。バッチ全体が完了して初めて、application/jsonコンテンツタイプと完全なJSONペイロードを含む単一の200 OKを送信します。これはストリーミングが不要なシンプルなツールに最適です。

中断したところから再開:EventStore 💾

サーバー側の再開可能性機能は、EventStoreインターフェースによって実現されています。

イベントの保存:send()メソッド内で、イベントストアが設定されている場合、サーバーはまずeventStore.storeEvent()を呼び出します。これによりメッセージが保存され、一意のeventIdが返されます。このIDは、クライアントへのSSEメッセージ内でid:フィールドとして送信されます。

**イベントの再生:**クライアントがlast-event-idヘッダー付きで再接続すると、サーバーのhandleGetRequestメソッドがそれをキャッチします。次にeventStore.replayEventsAfter()を呼び出し、クライアントが見逃したすべてのメッセージを取得して新しい接続経由で送信し、クライアントの状態をシームレスに復元します。

パート4:実際に動かす:実践的なシナリオ 🚀

では、これで実際に何が構築できるのでしょうか?

長時間実行されるAIツール:「リサーチエージェント」ツールを構築しているとします。ユーザーがトピックを与えます。POSTリクエストが送信されます。サーバーは専用のレスポンスストリーム上で更新をストリーミングできます:{"status": "Webを検索中..."}{"status": "10のソースを発見、要約中..."}{"status": "レポート生成中..."}、その後に最終的なテキストが続きます。長いタスクがインタラクティブになります。

**インタラクティブなユーザー入力(引き出し):**AIがファイルへのアクセス許可をユーザーに求める必要があるとします。一般アナウンスメント(GET)チャネルを介してelicitInputリクエストを送信できます。クライアントアプリがこれを検出し、ネイティブの「アクセスを許可しますか?」ダイアログをポップアップ表示し、はい/いいえの回答をサーバーに送り返します。これは流動的な双方向会話です。

**リアルタイムダッシュボード:**システムリソースを監視するサーバーを想像してください。サーバーは、GETストリーム経由で接続された複数のクライアントダッシュボードを持つことができます。CPU使用率が変化するたびに、サーバーはcpu_usage_changed通知をsend()するだけで、接続されているすべてのダッシュボードがリアルタイムで更新されます。

セッション管理パターンの比較表

特徴 レガシーHTTP+SSE モダンStreamable HTTP
セッションID管理 アプリケーションレベル プロトコルレベル(ヘッダー統合)
再開可能性 なし ビルトイン(last-event-id)
双方向通信 複雑(複数のチャネル) 統一された専用ストリーム
エラーハンドリング 手動実装が必要 指数バックオフ付き自動再接続
セッション永続化 カスタムストレージ EventStoreインターフェース
トランスポート抽象化 低レベル 高レベル、統合API

最終的な考察:デザインの進化 ✨

レガシーHTTP+SSEプロトコルからモダンなStreamable HTTPプロトコルへの旅は、ソフトウェア進化の完璧なケーススタディです。レガシープロトコルは、当時の標準的なWeb技術で可能な限界を押し広げた巧妙なソリューションでした。機能しましたが、アーキテクチャ上の継ぎ目がありました—アプリケーションレベルのセッションマップの必要性、再開可能性の欠如、不格好なハンドシェイクです。

モダンなStreamable HTTPプロトコルは、これらの継ぎ目から学んだ直接的な結果です。HTTPの性質により整合するようにフローを再構想し、統一された、より強力で、はるかに回復力のあるシステムを作り出しました。これは、優れたエンジニアリングとは単に物事を動作させることだけでなく、堅牢で、エレガントで、その上に構築するのが楽しくなるまで洗練することだという証です。

したがって、次にリアルタイムトランスポートを選択するときは、違いを覚えておいてください。シンプルな通知をプッシュするだけなら、クラシックSSEが友達です。しかし、次世代のインタラクティブで、回復力があり、インテリジェントなアプリケーションを構築しているなら、Streamable HTTPのようなプロトコルが、あなたが立ちたい強力な基盤です。

たくさんの情報を受け取ったことは理解していますが、このディープダイブがMCP Streamable HTTPプロトコルの背後にある魔法を解明し、レガシーSSE前身との関係を明確にしたことを願っています。コードと設計の選択を理解することで、あなたはその全パワーを活用する準備ができました。

4
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?