0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Server-Sent Events(SSE)の2方向通信Server&Clientの最小構成

Posted at

はじめに

従来Webアプリでサーバーとクライアントと2方向通信を実現するには、WebSocketを用いられていますが、最近ChatGPTを中心とした対話型AIはServer-Sent Events(SSE)を使って同等な機能を実現しています。

また、つい最近脚光を浴びているAIモデルが外部のツールとの通信プロトコルModel Context Protocol(MCP)も、SSE技術を用いられています。

実装

Server

server.mjs
import express from 'express';
import { encode } from 'eventsource-encoder';
import { randomBytes } from 'node:crypto';

const app = express();
const port = 3000;

// セッション保存用
const clients = {};

app.use(express.json()); // JSON形式のリクエストボディをパースするために必要

import cors from 'cors'; // CORS対策 (開発用)
app.use(cors()); // CORS対策 (開発用)

// 受信用エンドポイント、SSE (Server-Sent Events) を使用
app.get('/downward', (_req, res) => {
  const sessionId = randomBytes(8).toString('hex'); // セッションIDを生成

  res.set({ // レスポンスヘッダーの設定
    'Content-Type': 'text/event-stream', // SSE特有のContent-Type
    'Cache-Control': 'no-cache', // キャッシュを無効化
    'Connection': 'keep-alive', // 接続を維持
  });

  clients[sessionId] = res; // クライアントのレスポンスオブジェクトを保存
  console.log(`Client connected with session ID: ${sessionId}`);

  // 初回接続時にウェルカムメッセージを送信
  res.write(encode({
    id: randomBytes(16).toString('base64'), // ランダムなメッセージIDを生成
    event: 'welcome',
    data: JSON.stringify({
      sessionId, // セッションIDをクライアントに送信
      text: 'Welcome to the chat!',
      timestamp: new Date().toLocaleTimeString(),
    }),
  }));

  res.on('close', () => {
    delete clients[sessionId]; // クライアントが切断されたらセッションを削除
    console.log(`Client disconnected with session ID: ${sessionId}`);
  });
});

// 送信用エンドポイント、通常のPOSTリクエストを使用
app.post('/upward', (req, res) => {
  console.log('Received POST request to /upward', req.body);
  console.log('headers:', req.headers);

  const { text } = req.body;
  if (!text) {
    return res.status(400).json({ error: 'Text are required' });
  }

  const sessionId = req.header('X-Session-ID'); // セッションIDをヘッダーから取得
  const client = clients[sessionId]; // セッションIDに対応するクライアントを取得
  if (!sessionId || !client) {
    return res.status(400).json({ error: 'Invalid session' });
  }

  console.log(`Received message from session ID: ${sessionId}, text: ${text}`);

  // エコーメッセージを送信
  client.write(encode({
    id: randomBytes(16).toString('base64'), // ランダムなメッセージIDを生成
    // event: 'message', // イベント名を省略した場合は `message` がデフォルト
    data: JSON.stringify({
      text: `Data received: ${text}`,
      timestamp: new Date().toLocaleTimeString(),
    }),
  }));

  res.status(201).json({ status: 'Message sent successfully' });
});

app.listen(port, () => {
  console.log(`Server is running at http://localhost:${port}`);
});

サーバーの解説

SSEのエンドポイントを作るには、通常のGETPOSTなどのRequestのヘッダーに 'Content-Type': 'text/event-stream'及び'Connection': 'keep-alive'を入れれば、Responseは継続化されて、サーバーの都合で送信することができます。

ただし、このResponseのハンドラーは送信しかできないので、クライアントからの受信は通常のPOSTアクセスなどで受信することとなります。2方向通信を実現するには、送信と受信が2組異なるの通信路を使用しているので、手動でセッションを管理する必要があります。

また、送信メッセージを eventsource-encoder を使って整形していますが、手動で以下の形式に整形しても全く問題ありません。

encode({ id: '12345', event: 'my-event', data: JSON.stringify({ foo: 42 }) })

=> 'event: my-event\nid: 12345\ndata: {"foo":42}\n\n'

上記送信内容の中、 data: ... \n\n は必須で、event: ... \nおよびid: ... \nはオプションです。

Client

ブラウザJS

client.js

// 受信用エンドポイントとSSE接続を確立
const eventSource = new EventSource('http://localhost:3000/downward');
let sessionId = null;

eventSource.addEventListener('message', (e) => {
  const { text, timestamp } = JSON.parse(e.data);
  console.log('Message received:', text, 'at', timestamp);
});

eventSource.addEventListener('welcome', (e) => {
  const { sessionId: id, text, timestamp } = JSON.parse(e.data);
  sessionId = id; // セッションIDを保存
  console.log('Welcome message received:', text, 'at', timestamp);
});

eventSource.addEventListener('error', (e) => {
  console.error('Error occurred:', e);
});

async function sendMessage(text) {
  try {
    const response = await fetch('http://localhost:3000/upward', {
      method: 'POST',
      headers: {
        'Content-Type': 'application/json',
        'X-Session-ID': sessionId, // セッションIDをヘッダーに追加
      },
      body: JSON.stringify({ text }),
    });

    if (!response.ok) {
      throw new Error('Network response was not ok');
    }

    const data = await response.json();
    console.log('Message sent successfully:', data);
  } catch (error) {
    console.error('Error sending message:', error);
  }
}

Node.js

client.mjs
import { EventSource } from 'eventsource'
import { createInterface } from "readline/promises";

// 受信用エンドポイントとSSE接続を確立
const client = new EventSource('http://localhost:3000/downward');
let sessionId = null;

// readlineインターフェースを作成
const rl = createInterface({
  input: process.stdin,
  output: process.stdout
});
rl.setPrompt(`Send Message: \n> `);

// 一般メッセージの受信
client.addEventListener('message', (e) => {
  const { text, timestamp } = JSON.parse(e.data);
  console.log('Message received:', text, 'at', timestamp);
});

// ウェルカムメッセージの受信
client.addEventListener('welcome', (e) => {
  const { sessionId: id, text, timestamp } = JSON.parse(e.data);
  sessionId = id; // セッションIDを保存
  console.log('Welcome message received:', text, 'at', timestamp);
});

client.addEventListener('error', (e) => {
  console.error('Error occurred:', e);
});

rl.on('line', async input => {
  input = input.trim();
  if (!input) {
    rl.prompt();
    return;
  }

  await sendMessage(input);

  rl.prompt();
});

rl.on('SIGINT', async () => {
  console.log("\nCaught interrupt signal (Ctrl+C). Exiting...");
  rl.close();
  client.close();
});

async function sendMessage(text) {
  try {
    const response = await fetch('http://localhost:3000/upward', {
      method: 'POST',
      headers: {
        'Content-Type': 'application/json',
        'X-Session-ID': sessionId, // セッションIDをヘッダーに追加
      },
      body: JSON.stringify({ text }),
    });

    if (!response.ok) {
      throw new Error('Network response was not ok');
    }

    const data = await response.json();
    console.log('Message sent successfully:', data);
  } catch (error) {
    console.error('Error sending message:', error);
  }
}

クライアントの解説

SSEサーバーへ接続するには、ブラウザの EventSource インターフェイスまたはNode.js 向けの EventSource のポリフィル を使うと便利です。

EventSourceが使えない環境でも、下記のようにResponseを直接Parseすることも可能です。

lowlevel-client.js
const response = await fetch('http://localhost:3000/downward', {
  method: 'GET',
  headers: {
    'Accept': 'text/event-stream',
    'Cache-Control': 'no-cache',
    'Connection': 'keep-alive'
  }
});

const reader = response.body.getReader();
const decoder = new TextDecoder('utf-8');

while (true) {
  const { done, value } = await reader.read();
  if (done) {
    break; // Stream has ended
  }

  let buffer = '';
  while (buffer.indexOf('\n\n') === -1) {
    buffer += decoder.decode(value, { stream: true });
  }

  let match;
  if (match = buffer.match(/event:(.+?)\n/)) {
    console.log('Event:', match[1]);
  }

  if (match = buffer.match(/data:(.+?)\n/)) {
    console.log('Data:', match[1]);
  }
}

WebSocketとの比較

特徴 WebSocket Server-Sent Events (SSE)
通信方向 双方向 (全二重) 単方向 (サーバーからクライアントへ)
プロトコル HTTP上で確立される独立したプロトコル (ws:// または wss://) HTTP/1.1上で動作し、HTTPヘッダーとMIMEタイプtext/event-streamを使用
接続の種類 永続的な単一のTCP接続 永続的な単一のHTTP接続
データ形式 任意のバイナリまたはテキストデータ UTF-8エンコードされたプレーンテキスト(data: ... 形式)
エラー処理 組み込みのping/pong、接続クローズ通知など 接続切断時の自動再接続メカニズム
実装の複雑さ クライアント・サーバー双方で複雑な実装が必要な場合がある クライアント側は比較的シンプル
ユースケース リアルタイムチャット、オンラインゲーム、共同編集ツールなど、双方向通信が頻繁に必要なアプリケーション リアルタイム通知、ニュースフィード、株価表示、AIチャット(サーバーからの連続データ更新)など、サーバーからの一方的なプッシュが主なアプリケーション
ブラウザ対応 広くサポートされている 主要なモダンブラウザでサポートされている (IE非対応)
オーバーヘッド 確立後のオーバーヘッドは小さい 各イベントにHTTPヘッダーが付与されるため、少量のオーバーヘッドがある
HTTP/2の恩恵 直接は関係しないが、HTTP/2がベースとなる技術ではない HTTP/2の多重化により、単一の接続で複数のSSEストリームを効率的に扱うことが可能
セキュリティ wss://を使用することでTLS/SSLによる暗号化が可能 https://を使用することでTLS/SSLによる暗号化が可能
接続の維持 クライアントとサーバー間で定期的なping/pongメッセージを送信して接続を維持 サーバーは定期的にコメント行(:)を送信して接続を維持
サーバーの負荷 双方向通信のため、サーバーはクライアントからのリクエストを頻繁に処理する必要がある サーバーは一方向のデータ送信のみを行うため、負荷は比較的低い
クライアントの実装 JavaScriptのWebSocket APIを使用して実装可能 JavaScriptのEventSource APIを使用して実装可能
接続の再確立 接続が切断された場合、クライアントは自動的に再接続を試みることができる 接続が切断された場合、EventSourceは自動的に再接続を試みる
メッセージの順序 メッセージは送信された順序で受信されるが、双方向通信のため、クライアントからのメッセージが先に到着する可能性もある メッセージはサーバーからクライアントへ送信されるため、順序は保証される
データの圧縮 WebSocketはデータ圧縮をサポートしている(例:permessage-deflate拡張) SSEはデータ圧縮をサポートしていないが、HTTP圧縮(gzipなど)を利用可能

あと特筆すべきのは、筆者が開発中、PythonとRubyのWebSocketライブラリは相互接続する際に、時々エラーが発生しています。

それと比べると、SSEは本質上HTTPなので、異なる構成のサーバー間通信に活用できるかと考えています。

参考

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?