最近、Spring WebFlux使って色々作り始めました。
Spring WebFlux使うと、
@GetMapping
public Flux<Hoge> all() {
return Flux.create(sink -> {
// ...
})
}
みたいな、記述をControllerに書きますが、これってどうやってブラウザで受け取るんだっけ?
というのをちょっと調べてやってみたので書きます。
まず、Fluxが返るリクエストの返り値
こちらの記事がとても参考になります。
BLOG.IK.AM - はじめてのSpring WebFlux (その1 - Spring WebFluxを試す)
Content-Type: text/event-stream
(Server-Sent Event
)形式か、Content-Type: application/stream+json
の形式で返ってくるようですね。
それぞれ、下記のような形式でbodyが返るようです。
-
Content-Type: text/event-stream
data:{"key":"value" ... } data:{"key":"value" ... } data:{"key":"value" ... }
-
Content-Type: application/stream+json
{"key":"value" ... } {"key":"value" ... } {"key":"value" ... }
Server-Sent Event
はググると、Server-Sent Events の利用 | MDNとかの説明でEventSource使うってことがすぐにわかりましたが、application/stream+json
はどうするのでしょうか?
Fetch APIからapplication/stream+json
でアクセスする
Fetch APIを使って、次のようにアクセスできるようです。
const url = "..."; // リクエストするURL
const callback = json => { /* ... 各行JSONを処理するロジック */ };
const decoder = new TextDecoder();
const abortController = new AbortController();
const { signal } = abortController;
// fetch開始
fetch(url, {
signal,
headers: {
Accept: "application/stream+json"
}
}).then(response => {
let buffer = "";
/**
* ストリームから読み込まれた部分文字列を処理します
* @param {string} chunk 読み込まれた文字列
* @returns {void}
*/
function consumeChunk(chunk) {
buffer += chunk;
// 改行コードで分割し、各JSON行を取り出します
// https://en.wikipedia.org/wiki/JSON_streaming#Line-delimited_JSON
// http://jsonlines.org/
const re = /(.*?)(\r\n|\r|\n)/g;
let result;
let lastIndex = 0;
while ((result = re.exec(buffer))) {
const data = result[1];
if (data.trim()) {
callback(JSON.parse(data)); // 処理するcallbackにJSONを渡す。
}
({ lastIndex } = re);
}
// 改行コードが存在しなければ、残りの文字列は次のreadと結合して処理します
buffer = buffer.slice(lastIndex);
}
// ストリームのReaderを作成
const reader = response.body.getReader();
// 読み込み処理本体
function readNext() {
return reader.read().then(({ done, value }) => {
if (!done) {
// --読み込み処理--
consumeChunk(decoder.decode(value));
return readNext();
}
// --終了処理--
if (buffer.trim()) {
// 最後に残った文字列があれば処理します。
// 最後の行データの後ろに改行コードが含まれていなければここに到達します。
// Spring WebFluxでは起きないようですが、 http://jsonlines.org/ を見ると最後の改行コードがない場合もある印象だったので念のため実装します。
// `consumeChunk` は改行コードを渡さないと行を認識しないので改行コードを渡します。
consumeChunk("\n");
}
return response;
});
}
return readNext();
});
}
以降で何をしてるか説明していきます。
AbortController
これはFluxの取得とはあまり関係ありませんが、fetch
を途中でキャンセルしたいなら必要です。
const abortController = new AbortController();
const { signal } = abortController;
// fetch開始
fetch(url, {
signal,
headers: {
Accept: "application/stream+json"
}
})
fetch
のオプションにsignal
を渡しておくと。
abortController.abort()
とするとfetch
を途中でやめることができます。
この際、JavaのController側でも、キャンセルされたら処理を中断するようにしておくとさらに良さそうです。
@GetMapping
public Flux<Hoge> all() {
return Flux.create(sink -> {
// ...
if (sink.isCancelled()) {
// キャンセルされているので処理を中断しましょう。
}
// ...
})
}
onCancel
を使う方法もあります。
response
をStreamで受ける
ReadableStream.getReader() | MDNを参考にします。
次のように、ReadableStreamを使うと、レスポンスをStreamで受けられるようです。
fetch(...).then(response => {
// ...
// ストリームのReaderを作成
const reader = response.body.getReader();
function readNext() {
return reader.read().then(({ done, value }) => {
if (!done) {
/* valueを処理する */
// ...
return readNext();
}
return; /* 終了 */
});
}
return readNext();
})
value
をdecodeする
ReadableStreamから取得されたvalue
はUint8Arrayでやってくるようなので、TextDecoderを使ってdecodeします。
const decoder = new TextDecoder();
// ...
const sValue = decoder.decode(value) // Uint8Arrayをdecodeしてstringに変換
chunk
を処理する
ここまでで、Streamの中身を文字列で取得するところまで出来ました。
ここで実際にJSONをparseしていきます。
let buffer = "";
/**
* ストリームから読み込まれた部分文字列を処理します
* @param {string} chunk 読み込まれた文字列
* @returns {void}
*/
function consumeChunk(chunk) {
buffer += chunk;
// 改行コードで分割し、各JSON行を取り出します
// https://en.wikipedia.org/wiki/JSON_streaming#Line-delimited_JSON
// http://jsonlines.org/
const re = /(.*?)(\r\n|\r|\n)/g;
let result;
let lastIndex = 0;
while ((result = re.exec(buffer))) {
const data = result[1];
if (data.trim()) {
callback(JSON.parse(data)); // 処理するcallbackにJSONを渡す。
}
({ lastIndex } = re);
}
// 改行コードが存在しなければ、残りの文字列は次のreadと結合して処理します
buffer = buffer.slice(lastIndex);
}
何やらごにょごにょやっていますが、以降で説明します。
改行コードでJSONを分割する
application/stream+json
は先で書いたように次のようなレスポンスが返ってきます。
{"key":"value" ... }
{"key":"value" ... }
{"key":"value" ... }
このレスポンスは、改行コードで区切られたJSONです。
なので、改行コードで区切って、各行JSONをJSON.parse
しています。
chunkを結合する
ReadableStreamからread()
で取得した文字列は、完全なJSONではありません。あくまでレスポンス全体の一部分です。
なので、chunkに改行コードが含まれない場合は、次のchunkと結合して処理するようにしています。
説明は以上です。
感想
EventSourceを使った方法は、ヘッダー情報が渡せなかったりして、組み込もうとしているアプリケーションによっては、工夫や変更が必要になるかもしれません。
(例えば、認証トークンをヘッダーに載せている前提でサーバーサイドの処理をしている場合。とか)
しかし、このFetch APIを使う方法であれば、普通のHTTPアクセスとほとんど一緒なので、(Fetch APIに対応したブラウザであれば)気軽に使うことができる印象でした。