はじめに
従来Webアプリでサーバーとクライアントと2方向通信を実現するには、WebSocketを用いられていますが、最近ChatGPTを中心とした対話型AIはServer-Sent Events(SSE)を使って同等な機能を実現しています。
また、つい最近脚光を浴びているAIモデルが外部のツールとの通信プロトコルModel Context Protocol(MCP)も、SSE技術を用いられています。
実装
Server
import express from 'express';
import { encode } from 'eventsource-encoder';
import { randomBytes } from 'node:crypto';
const app = express();
const port = 3000;
// セッション保存用
const clients = {};
app.use(express.json()); // JSON形式のリクエストボディをパースするために必要
import cors from 'cors'; // CORS対策 (開発用)
app.use(cors()); // CORS対策 (開発用)
// 受信用エンドポイント、SSE (Server-Sent Events) を使用
app.get('/downward', (_req, res) => {
const sessionId = randomBytes(8).toString('hex'); // セッションIDを生成
res.set({ // レスポンスヘッダーの設定
'Content-Type': 'text/event-stream', // SSE特有のContent-Type
'Cache-Control': 'no-cache', // キャッシュを無効化
'Connection': 'keep-alive', // 接続を維持
});
clients[sessionId] = res; // クライアントのレスポンスオブジェクトを保存
console.log(`Client connected with session ID: ${sessionId}`);
// 初回接続時にウェルカムメッセージを送信
res.write(encode({
id: randomBytes(16).toString('base64'), // ランダムなメッセージIDを生成
event: 'welcome',
data: JSON.stringify({
sessionId, // セッションIDをクライアントに送信
text: 'Welcome to the chat!',
timestamp: new Date().toLocaleTimeString(),
}),
}));
res.on('close', () => {
delete clients[sessionId]; // クライアントが切断されたらセッションを削除
console.log(`Client disconnected with session ID: ${sessionId}`);
});
});
// 送信用エンドポイント、通常のPOSTリクエストを使用
app.post('/upward', (req, res) => {
console.log('Received POST request to /upward', req.body);
console.log('headers:', req.headers);
const { text } = req.body;
if (!text) {
return res.status(400).json({ error: 'Text are required' });
}
const sessionId = req.header('X-Session-ID'); // セッションIDをヘッダーから取得
const client = clients[sessionId]; // セッションIDに対応するクライアントを取得
if (!sessionId || !client) {
return res.status(400).json({ error: 'Invalid session' });
}
console.log(`Received message from session ID: ${sessionId}, text: ${text}`);
// エコーメッセージを送信
client.write(encode({
id: randomBytes(16).toString('base64'), // ランダムなメッセージIDを生成
// event: 'message', // イベント名を省略した場合は `message` がデフォルト
data: JSON.stringify({
text: `Data received: ${text}`,
timestamp: new Date().toLocaleTimeString(),
}),
}));
res.status(201).json({ status: 'Message sent successfully' });
});
app.listen(port, () => {
console.log(`Server is running at http://localhost:${port}`);
});
サーバーの解説
SSEのエンドポイントを作るには、通常のGET
やPOST
などのRequestのヘッダーに 'Content-Type': 'text/event-stream'
及び'Connection': 'keep-alive'
を入れれば、Responseは継続化されて、サーバーの都合で送信することができます。
ただし、このResponseのハンドラーは送信しかできないので、クライアントからの受信は通常のPOST
アクセスなどで受信することとなります。2方向通信を実現するには、送信と受信が2組異なるの通信路を使用しているので、手動でセッションを管理する必要があります。
また、送信メッセージを eventsource-encoder を使って整形していますが、手動で以下の形式に整形しても全く問題ありません。
encode({ id: '12345', event: 'my-event', data: JSON.stringify({ foo: 42 }) })
=> 'event: my-event\nid: 12345\ndata: {"foo":42}\n\n'
上記送信内容の中、 data: ... \n\n
は必須で、event: ... \n
およびid: ... \n
はオプションです。
Client
ブラウザJS
// 受信用エンドポイントとSSE接続を確立
const eventSource = new EventSource('http://localhost:3000/downward');
let sessionId = null;
eventSource.addEventListener('message', (e) => {
const { text, timestamp } = JSON.parse(e.data);
console.log('Message received:', text, 'at', timestamp);
});
eventSource.addEventListener('welcome', (e) => {
const { sessionId: id, text, timestamp } = JSON.parse(e.data);
sessionId = id; // セッションIDを保存
console.log('Welcome message received:', text, 'at', timestamp);
});
eventSource.addEventListener('error', (e) => {
console.error('Error occurred:', e);
});
async function sendMessage(text) {
try {
const response = await fetch('http://localhost:3000/upward', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'X-Session-ID': sessionId, // セッションIDをヘッダーに追加
},
body: JSON.stringify({ text }),
});
if (!response.ok) {
throw new Error('Network response was not ok');
}
const data = await response.json();
console.log('Message sent successfully:', data);
} catch (error) {
console.error('Error sending message:', error);
}
}
Node.js
import { EventSource } from 'eventsource'
import { createInterface } from "readline/promises";
// 受信用エンドポイントとSSE接続を確立
const client = new EventSource('http://localhost:3000/downward');
let sessionId = null;
// readlineインターフェースを作成
const rl = createInterface({
input: process.stdin,
output: process.stdout
});
rl.setPrompt(`Send Message: \n> `);
// 一般メッセージの受信
client.addEventListener('message', (e) => {
const { text, timestamp } = JSON.parse(e.data);
console.log('Message received:', text, 'at', timestamp);
});
// ウェルカムメッセージの受信
client.addEventListener('welcome', (e) => {
const { sessionId: id, text, timestamp } = JSON.parse(e.data);
sessionId = id; // セッションIDを保存
console.log('Welcome message received:', text, 'at', timestamp);
});
client.addEventListener('error', (e) => {
console.error('Error occurred:', e);
});
rl.on('line', async input => {
input = input.trim();
if (!input) {
rl.prompt();
return;
}
await sendMessage(input);
rl.prompt();
});
rl.on('SIGINT', async () => {
console.log("\nCaught interrupt signal (Ctrl+C). Exiting...");
rl.close();
client.close();
});
async function sendMessage(text) {
try {
const response = await fetch('http://localhost:3000/upward', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'X-Session-ID': sessionId, // セッションIDをヘッダーに追加
},
body: JSON.stringify({ text }),
});
if (!response.ok) {
throw new Error('Network response was not ok');
}
const data = await response.json();
console.log('Message sent successfully:', data);
} catch (error) {
console.error('Error sending message:', error);
}
}
クライアントの解説
SSEサーバーへ接続するには、ブラウザの EventSource インターフェイスまたはNode.js 向けの EventSource のポリフィル を使うと便利です。
EventSourceが使えない環境でも、下記のようにResponseを直接Parseすることも可能です。
const response = await fetch('http://localhost:3000/downward', {
method: 'GET',
headers: {
'Accept': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive'
}
});
const reader = response.body.getReader();
const decoder = new TextDecoder('utf-8');
while (true) {
const { done, value } = await reader.read();
if (done) {
break; // Stream has ended
}
let buffer = '';
while (buffer.indexOf('\n\n') === -1) {
buffer += decoder.decode(value, { stream: true });
}
let match;
if (match = buffer.match(/event:(.+?)\n/)) {
console.log('Event:', match[1]);
}
if (match = buffer.match(/data:(.+?)\n/)) {
console.log('Data:', match[1]);
}
}
WebSocketとの比較
特徴 | WebSocket | Server-Sent Events (SSE) |
---|---|---|
通信方向 | 双方向 (全二重) | 単方向 (サーバーからクライアントへ) |
プロトコル | HTTP上で確立される独立したプロトコル (ws:// または wss:// ) |
HTTP/1.1上で動作し、HTTPヘッダーとMIMEタイプtext/event-stream を使用 |
接続の種類 | 永続的な単一のTCP接続 | 永続的な単一のHTTP接続 |
データ形式 | 任意のバイナリまたはテキストデータ | UTF-8エンコードされたプレーンテキスト(data: ... 形式) |
エラー処理 | 組み込みのping/pong、接続クローズ通知など | 接続切断時の自動再接続メカニズム |
実装の複雑さ | クライアント・サーバー双方で複雑な実装が必要な場合がある | クライアント側は比較的シンプル |
ユースケース | リアルタイムチャット、オンラインゲーム、共同編集ツールなど、双方向通信が頻繁に必要なアプリケーション | リアルタイム通知、ニュースフィード、株価表示、AIチャット(サーバーからの連続データ更新)など、サーバーからの一方的なプッシュが主なアプリケーション |
ブラウザ対応 | 広くサポートされている | 主要なモダンブラウザでサポートされている (IE非対応) |
オーバーヘッド | 確立後のオーバーヘッドは小さい | 各イベントにHTTPヘッダーが付与されるため、少量のオーバーヘッドがある |
HTTP/2の恩恵 | 直接は関係しないが、HTTP/2がベースとなる技術ではない | HTTP/2の多重化により、単一の接続で複数のSSEストリームを効率的に扱うことが可能 |
セキュリティ |
wss:// を使用することでTLS/SSLによる暗号化が可能 |
https:// を使用することでTLS/SSLによる暗号化が可能 |
接続の維持 | クライアントとサーバー間で定期的なping/pongメッセージを送信して接続を維持 | サーバーは定期的にコメント行(: )を送信して接続を維持 |
サーバーの負荷 | 双方向通信のため、サーバーはクライアントからのリクエストを頻繁に処理する必要がある | サーバーは一方向のデータ送信のみを行うため、負荷は比較的低い |
クライアントの実装 | JavaScriptのWebSocket APIを使用して実装可能 |
JavaScriptのEventSource APIを使用して実装可能 |
接続の再確立 | 接続が切断された場合、クライアントは自動的に再接続を試みることができる | 接続が切断された場合、EventSource は自動的に再接続を試みる |
メッセージの順序 | メッセージは送信された順序で受信されるが、双方向通信のため、クライアントからのメッセージが先に到着する可能性もある | メッセージはサーバーからクライアントへ送信されるため、順序は保証される |
データの圧縮 | WebSocketはデータ圧縮をサポートしている(例:permessage-deflate 拡張) |
SSEはデータ圧縮をサポートしていないが、HTTP圧縮(gzipなど)を利用可能 |
あと特筆すべきのは、筆者が開発中、PythonとRubyのWebSocketライブラリは相互接続する際に、時々エラーが発生しています。
それと比べると、SSEは本質上HTTPなので、異なる構成のサーバー間通信に活用できるかと考えています。