本記事ではSSE(Server Sent Events)方式でサーバからクライエントにデータをプッシュするWebAPIを作成したいと思います。
SSEはWHATWGでも標準化されているサーバプッシュ技術の1つです1。最近だとOpenAI APIで使われているのが(自分の中で)話題になりました2。ChatGPTの回答が1文字1文字連続で表示されていくあの動作を実現しています。
JerseyがSSE(Server Sent Events)方式に対応していましたので3、Dropwizard+Jerseyによる実装を紹介します。
実装内容
プロジェクトの土台は次のMaven Archetypesから作成しています。
Dropwizard Archetype For Simple Java Services
ライブラリのインポート
pom.xml
<dependency>
<groupId>org.glassfish.jersey.media</groupId>
<artifactId>jersey-media-sse</artifactId>
<version>3.1.2</version>
</dependency>
ユニキャスト
ユニキャスト用リソースでは、接続している特定の1つのクライアントに対してだけ、サーバからデータを送信します。
以下のコードは、エンドポイント /event/new
に対してGETしてきたクライエントに対して、1秒ごとに1つメッセージを送ることを5回繰り返します。
@Path("events")
public class SseResource {
@GET
@Path("/new")
@Produces(MediaType.SERVER_SENT_EVENTS) // レスポンスのContent-Typeはtext/event-streamとする
// 引数のeventSinkとsseはリクエスト実行毎にjerseyのフレームワーク側で依存性注入してくれる
public void getServerSentEvents(@Context SseEventSink eventSink, @Context Sse sse) {
new Thread(() -> {
for (int i = 0; i < 5; i++) {
// 1秒待つ
try {
Thread.sleep(1000);
}
catch (InterruptedException e) {}
// イベントデータの作成
final OutboundSseEvent event = sse.newEventBuilder()
.name("message-to-client")
.data(String.class, "Hello world " + i + "!")
.build();
// 作成したイベントデータをクライアントに送信する
eventSink.send(event);
}
eventSink.close();
}).start();
}
}
実行結果
curlコマンドで上記のリソースにGETリクエストします。
ヘッダも見ると以下のようなリクエストとレスポンスになっています。ミソなのはレスポンスのヘッダにある
Content-Type: text/event-stream
ですね。
$ curl.exe http://localhost:8080/events/new -H 'Accept: text/event-stream' -v
* Trying 127.0.0.1:8080...
* Connected to localhost (127.0.0.1) port 8080 (#0)
> GET /events/new HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/8.0.1
> Accept: text/event-stream
>
< HTTP/1.1 200 OK
< Date: Sun, 06 Aug 2023 16:43:16 GMT
< Content-Type: text/event-stream
< Transfer-Encoding: chunked
<
event: message-to-client
data: Hello world 0!
event: message-to-client
data: Hello world 1!
event: message-to-client
data: Hello world 2!
event: message-to-client
data: Hello world 3!
event: message-to-client
data: Hello world 4!
* Connection #0 to host localhost left intact
ブロードキャスト
ブロードキャスト用リソースでは、接続中のすべてのクライアントに一度にメッセージを送信します。
以下のコードは、事前にエンドポイント /broadcast
に対してGETリクエストしていたクライエントすべてに対して、/broadcast
にPOSTしたメッセージを送信します。
@Singleton
@Path("broadcast")
public class SseBroadcastResource {
private Sse sse;
// 以下のインスタンスに登録されたSseEventSinkインスタンスすべてに一度にメッセージを送信できる。
private SseBroadcaster broadcaster;
// 引数のsseはリソース登録時のクラス生成時にjerseyのフレームワーク側で依存性注入してくれる
public SseBroadcastResource(@Context final Sse sse) {
this.sse = sse;
this.broadcaster = sse.newBroadcaster();
}
// 受信待ち状態のクライエントに対してメッセージを一斉送信します。
@POST
@Produces(MediaType.TEXT_PLAIN)
@Consumes(MediaType.TEXT_PLAIN)
public String broadcastMessage(String message) {
final OutboundSseEvent event = sse.newEventBuilder()
.name("message")
.mediaType(MediaType.TEXT_PLAIN_TYPE)
.data(String.class, message)
.build();
broadcaster.broadcast(event);
return "Message '" + message + "' has been broadcast.";
}
// ここにリクエストしたクライアントは受信待ち状態になります。
@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
public void listenToBroadcast(@Context SseEventSink eventSink) {
this.broadcaster.register(eventSink);
}
}
実行結果
curlコマンドを用いて、2つの受信用クライアントと1つの送信用クライアントを用意してブロードキャストSSEの動作を確認します。
以下のgif画像の右側2つのターミナルが受信用クライアント、左側のターミナルが送信用クライアントとしています。
最初に2つの受信用クライアントそれぞれでエンドポイント/broadcast
にGETリクエストをして受信待ち状態にしておきます。
この状態で送信用クライアントから/broadcast
にPOSTリクエストで送ったメッセージが、受信用クライアントすべてに送られます。
まとめ
今回はDropwizard+Jerseyを用いてSSE方式でメッセージ送信できるWebAPIを作成してみました。
サーバからデータ送信する方法は他にもポーリング方式やWebSocket方式などありますが、サーバから一方的にクライアントにレスポンス良くデータを送りたいときはSSE方式が適しているのかなと思います。
Webアプリケーションフレームワークの種類ごとにSSEの書き方などに違いがあると思うのですが、今回はひとまずSSE通信がどういうものかということを中心に把握できたのでよかったです。