0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

WebFluxでAIチャットみたいなレスポンスが随時表示されるUIを実装する

Posted at

人間を相手にしたチャットツールの場合、チャットの内容を入力し終えて、送信後にチャットの内容がまとめて見えるようになります。

一方で、ChatGPTやGeminiのようなAIチャットツールでは、まるでAIがタイピングしているかのように、レスポンスの内容が随時表示され、レスポンスが全く返ってこない待ち時間はほとんどありません。このような、レスポンスが随時表示されるようなチャットのUIを作ってみようと思ったので、実装して得られた学びのメモです。

きかっけ

チャットアプリのUI(Web)を開発する案件がありました。

案件概要

生成AIが組み込まれたAPIがあり、そのAPIとのやりとりをチャット画面として表示するWebのUIを開発する案件。(APIは依頼元の会社がすでに作成済み)

セキュリティの観点からAPIとのやりとりはサーバーサイドで対応して欲しいとのこと。つまり、開発対象はAIとのやりとりを表示するWebの部分と、外部APIとやりとりしてレスポンスを返すバックエンドの部分。

image.png

開発言語は何でも良いとのことだったのですが、あまり時間をかけずに使い慣れた技術でサクッと作ってしまおう、という流れになったので、別の案件でも使っていて比較的慣れている技術構成にしました。

  • 言語:Java
  • フレームワーク:SpringBoot
  • フロントエンド:Thymeleaf + Vue.js(CDNで導入)

とてもシンプルな要件だったので、実装はあまり凝ったことはせずにサクッと終わらせたのですが、時間に余裕があったので、先に説明したレスポンスが随時表示される仕組みも実装しながら、学んでみることにしました。

用語の整理

AIチャットのように、レスポンスが随時表示される仕組みを実装をするにあたって、必要になる技術と用語を整理します。

リアクティブシステム

外部からの入力に対して迅速かつ効率的に応答することを特徴とするシステム。
特定の技術の事ではなく、システム設計のアーキテクチャを指す用語。

特に、以下の4つの特性を持つシステムの事を指す。

  • 応答性
  • 耐障害性
  • 弾力性
  • メッセージ駆動

AIがタイピングしているかのように随時レスポンスが表示される仕組みは、レスポンスに遅延がほとんどないため、高い応答性を持つリアクティブシステムの設計思想で実装していると考えられる。

サーバープッシュ技術

WebやHTTPに関する技術は、基本的にクライアント(Webブラウザ)からサーバーに対してリクエストを送信し、サーバーからのレスポンスをクライアントで受け取ることを前提した技術が多い。しかし、Webアプリケーションの技術が発展してきたことで、サーバー側からクライアントにデータを送信する必要性が生まれてきた。
そのような、サーバーからクライアントにデータを送信する技術の総称が、サーバープッシュ技術。サーバープッシュ技術には後述するSSE、双方向通信を実現するWebSocketなどがある。サーバープッシュ技術を使うことで、リアクティブシステムを実現しやすくする。

Server-sent Events(SSE)

サーバープッシュ技術の1つ。データをチャンクと呼ばれる塊に分割して送信する。SSEではチャンクのことをイベントと呼ぶ。

SSEは以下の特徴がある。

  • Content-Typeヘッダでtext/event-streamが指定されるとSSEによる応答を表す
  • SSEを使う場合はConnectionヘッダでkeep-aliveを指定し、サーバーがレスポンスを返した後も通信が持続するようにする
  • イベントは<タグ>: <データ>となるペアから構成されるテキストで、空行がイベントの終わりを表す
  • タグには以下の4種類がある
    • id:イベントを識別する
    • event:イベントの種類
    • data
      • サーバーから通知される情報
      • UTF-8でエンコードされたテキストである必要がある
      • 1つのイベントの中で連続して出力可能
    • retry:再接続時の待ち時間

SSEを使えば、レスポンスを少しずつ遅延なく表示する仕組みの場合、サーバーからの通知があれば実現できそうなので、SSEで実現できる。

ブロッキングIOとノンブロッキングIO

IOの処理はブロッキングIOとノンブロッキングIOに分けることができる。
ブロッキングIOとは、処理が完了するまでスレッドが「待ち続ける」状態。
一方のノンブロッキングIOでは、処理が完了する前にスレッドは次の処理に進める。

例えば、node.jsではファイル読み込みのメソッドにブロッキング用とノンブロッキング用がある。

ブロッキングIO

const fs = require('fs');

console.log('1. 処理開始');

// ブロッキング(同期的にファイルを読み込む)
const data = fs.readFileSync('sample.txt', 'utf-8');
console.log('2. ファイル内容:', data);

console.log('3. 処理終了');

出力結果

1. 処理開始
2. ファイル内容: XXXX
3. 処理終了

ノンブロッキングIO

const fs = require('fs');

console.log('1. 処理開始');

// ノンブロッキング(非同期的にファイルを読み込む)
fs.readFile('sample.txt', 'utf-8', (err, data) => {
  if (err) throw err;
  console.log('2. ファイル内容:', data);
});

console.log('3. 処理終了');
1. 処理開始
3. 処理終了
2. ファイル内容: XXXX

リアクティブシステムでは、その性質上スレッドを消費せず効率的にレスポンスを送り続けられることが求められるため、実装する処理がノンブロッキングであること重要になる。

WebFlux

SpringBootでリアクティブシステムを作るためのフレームワーク。
詳しくは公式リファレンスを参照。

Fetch Streaming

フロントエンド(JavaScript)で、SSEをfetch + ReadableStreamの技術で実装する方法。
JavaScriptでSSEを実装するための専用APIとしてEventSourceがあるが、GETしか対応していないなどの制限がある。
fetch + ReadableStreamでは、POSTにも対応でき、必要であればSSE以外の通信にも対応することが可能で、何かと柔軟性が高い。
詳しい解説はMDNなどを参照。

ここまでのまとめ

ここまでをまとめると、AIがタイピングしているようなUIは、リアクティブシステムの思想で実装されていると考えられる。リアクティブシステム自体はWebに限定されているものではないが、Webでリアクティブシステムを実現するための技術としてサーバープッシュ技術がある。サーバープッシュ技術の一つとして、Server-sent Eventsがある。Server-sent Events(SSE)を実現するうえではノンブロッキングで処理することが必要。SpringBootでリアクティブなWebアプリケーションを作るためのフレームワークがWebFlux。WebFluxを使うことで、ノンブロッキングでSSEに対応した実装が簡単に実現可能。SSEの実装ではフロントエンド(JavaScript)の対応も必要で、Fetch Stremingで柔軟な実装をすることができる。

実装サンプル1. 固定の値を返す

とりあえず、極力シンプルな実装をしてみて、理解を深めることに。

実行結果

実装して、動かしてみた結果が以下です。
クリックするとバックエンドにリンクエストを投げ、レスポンスは1文字ずつ、50ミリ秒ごとデータが送られ、送られてきたデータを随時表示される実装にしています。

SSE DEMO.gif

この動作をするWebアプリをSpringBootで実装します。

プロジェクトの用意

Spring initializrからSpringBootのプロジェクトを作成。
依存関係にWebFluxを追加しておく。

後から追加してもOK.
Mavenプロジェクトの場合はpom.xmlに以下を追加。

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

コントローラー

レスポンスを返すコントローラは以下。
SSEの実現方法を知ることが目的だったので、固定の文字列を分割して送信するだけの単純なものにしてます。

※実際の案件はJavaで実装しましたが、学習用に作り直したプロジェクトではKotlinにしました。
※エンドポイントを関数で定義する方法もありますが、今回は処理をシンプルにするためアノテーションによるマッピングにしています。

ReactiveRestController.kt
class ReactiveRestController {
    @GetMapping("/demo")
    fun reactiveDemo(): Flux<String> {
        val message = "Hello! This is a Server-sent Events example."
        val res = Flux.interval(Duration.ofMillis(50)) // 50ミリ秒ごとに発行
            .take(message.length.toLong()) // メッセージの長さ分
            .map { it -> message[it.toInt()].toString() } // 各文字を1つずつ
        return res
    }
}

戻り値の型をFlux<T>にします。ここではただの文字列にしたいのでStringにしていますが、
JSONにしたい場合はそれ用の型を定義して指定します。

フロントエンド

フロントのサンプルコードは以下。
あくまで学習用のサンプルなのでscript要素に直接JavaScriptを組み込んでいます。
実装を楽にするため、CDNでVueを導入。
JSONで受け取りたい場合はJSONにパースする処理などが別途必要です。

<!DOCTYPE html>
<html lang="ja">
<head>
  <meta charset="UTF-8" />
  <title>Reactive System Demo</title>
</head>
<body>
  <div id="app">
    <h1>SSE Demo</h1>
    <div class="btn-area">
      <button @click="sendRequest">
        click
      </button>
    </div>
    <div id="log">
      <div>{{ message }}</div>
    </div>
  </div>
  <script src="https://unpkg.com/vue@3/dist/vue.global.prod.js"></script>
  <script>
    const { createApp, ref } = Vue;
    createApp({
      setup() {
        const message = ref(''); // 画面に表示するメッセージ

        // SSEとして送られてきたデータをパースしてコンテンツのみを取得する
        function* parseSSE(chunkText, buffer) {
          buffer.push(chunkText);
          const joined = buffer.join('');
          const events = joined.split(/\r?\n\r?\n/);
          buffer.length = 0;

          const last = events.pop();
          if (last !== undefined) buffer.push(last);

          for (const ev of events) {
            for (const line of ev.split(/\r?\n/)) {
              if (line.startsWith("data:")) {
                yield line.slice(5);
              }
            }
          }
        }

        // リクエスト送信メソッド
        async function sendRequest() {
          message.value = '';

          try {
            const response = await fetch('/demo', {
              headers: { 'Accept': 'text/event-stream' } // SSEとしてデータを受信
            });

            const reader = response.body.getReader();
            const decoder = new TextDecoder('utf-8');
            const leftoverBuf = [];

            while (true) {
              const { value, done } = await reader.read();
              if (done) break;
              const chunkText = decoder.decode(value, { stream: true });

              for (const msg of parseSSE(chunkText, leftoverBuf)) {
                message.value += msg; // 受け取ったメッセージを結合していく
              }
            }
          } catch (err) {
            message.value = `エラー: ${err.message || err}`;
          }
        }

        return {
          message,
          callStream
        };
      }
    }).mount('#app');
  </script>
</body>
</html>

AIにも色々と助けてもらいながら実装しました。
フロントはちょっと複雑にも感じましたが、バックエンドはフレームワーク様様という感じで、かなり楽に実装できました。

実装サンプル2. 外部APIからのレスポンスをSSEとして返す

次は、外部のAPIと連携する場合、外部APIからのレスポンスをSSEのイベントとして受け取りたいケースを考えます。

Wiremockの用意

実際にAPIを作るのは手間なので、ここではWiremockで代用します。

ここではdocker-composeを使ってモックを立ち上げます。
プロジェクト配下に以下を用意。
ポート番号がアプリ側と被らないように調整します。

docker-compose.yaml
version: '3.8'

services:
  reactive-system-demo-wiremock:
    image: wiremock/wiremock:latest
    container_name: reactive-system-demo-wiremock
    ports:
      - "8081:8080"
    volumes:
      - ./wiremock:/home/wiremock

先の例と同じ結果になるように、Wiremockを使ったコードに置き換えます。

プロジェクト配下にwiremock/mappingsディレクトリを作成し、以下のjsonを任意のファイル名で作成します。

{
  "priority": 1,
  "request": {
    "method": "GET",
    "urlPath": "/api/demo"
  },
  "response": {
    "status": 200,
    "headers": {
      "Content-Type": "text/event-stream; charset=utf-8",
      "Cache-Control": "no-cache",
      "Connection": "keep-alive"
    },
    "body": "data: H\n\ndata: e\n\ndata: l\n\ndata: l\n\ndata: o\n\ndata: !\n\ndata:  \n\ndata: T\n\ndata: h\n\ndata: i\n\ndata: s\n\ndata:  \n\ndata: i\n\ndata: s\n\ndata:  \n\ndata: a\n\ndata:  \n\ndata: S\n\ndata: e\n\ndata: r\n\ndata: v\n\ndata: e\n\ndata: r\n\ndata: -\n\ndata: s\n\ndata: e\n\ndata: n\n\ndata: t\n\ndata:  \n\ndata: E\n\ndata: v\n\ndata: e\n\ndata: n\n\ndata: t\n\ndata: s\n\ndata:  \n\ndata: e\n\ndata: x\n\ndata: a\n\ndata: m\n\ndata: p\n\ndata: l\n\ndata: e\n\ndata: .\n\n",
    "chunkedDribbleDelay": {
      "numberOfChunks": 44,
      "totalDuration": 2200
    }
  }
}

bodyの行が長くなっていますが、実装サンプル1でレスポンスした文字列と同じ内容です。
また、chunkedDribbleDelayを設定することで、チャンクを分けて送信されるようにしています。

コントローラ

コントローラは以下のように修正することで、Mockから受け取ったデータをFluxとしてフロントにレスポンスすることができました。

@RestController
// WebClientConfigを別で定義しておく
class ReactiveRestController(private val builder: WebClient.Builder) {
    // デシリアライズするための型定義
    private val sseType =
        object : ParameterizedTypeReference<ServerSentEvent<String>>() {}

    private val client = builder
        .baseUrl("http://localhost:8081") // Wiremockサーバー
        .defaultHeader(HttpHeaders.ACCEPT, MediaType.TEXT_EVENT_STREAM_VALUE)
        .build()

    @GetMapping("/demo")
    fun streamChat(): Flux<String> {
        return client.get()
            .uri("/api/demo")
            .accept(MediaType.TEXT_EVENT_STREAM)
            .retrieve()
            .bodyToFlux(sseType) // Flux<ServerSentEvent<String>>
            .filter { it.data() != null }
            .map    { it.data()!! }
            .log("chat.data")
    }
}

同じエンドポイントのメソッドを修正したのでフロントエンドの修正は不要です。
Wiremockのコンテナを立ち上げ、アプリを再起動して実行すると、先ほどと同じ動きになります。

SSE DEMO.gif

まとめ

SpringBootでのWebアプリ開発はそこそこやっていたのですが、これまでSpring MVCを使ったWebアプリしか作ったことがなく、Spring WebFluxは今回初めて触れました。
リアクティブシステムやサーバープッシュ技術(SSEなど)の概念も今まで実装で触れたことがなかったので、難しくも感じましたが、色々と勉強にもなりました。

次はDBも使用した形のリアクティブシステムのサンプルも作ってみたい。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?