14
11

More than 5 years have passed since last update.

Spring WebFluxのFlux結果をJSからFetch APIで取得する

Last updated at Posted at 2019-04-21

最近、Spring WebFlux使って色々作り始めました。
Spring WebFlux使うと、

    @GetMapping
    public Flux<Hoge> all() {
        return Flux.create(sink -> {
            // ...
        })
    }

みたいな、記述をControllerに書きますが、これってどうやってブラウザで受け取るんだっけ?
というのをちょっと調べてやってみたので書きます。

まず、Fluxが返るリクエストの返り値

こちらの記事がとても参考になります。
BLOG.IK.AM - はじめてのSpring WebFlux (その1 - Spring WebFluxを試す)

Content-Type: text/event-stream(Server-Sent Event)形式か、Content-Type: application/stream+jsonの形式で返ってくるようですね。

それぞれ、下記のような形式でbodyが返るようです。

  • Content-Type: text/event-stream

    data:{"key":"value" ... }
    
    data:{"key":"value" ... }
    
    data:{"key":"value" ... }
    
    
  • Content-Type: application/stream+json

    {"key":"value" ... }
    {"key":"value" ... }
    {"key":"value" ... }
    

Server-Sent Eventはググると、Server-Sent Events の利用 | MDNとかの説明でEventSource使うってことがすぐにわかりましたが、application/stream+jsonはどうするのでしょうか?

Fetch APIからapplication/stream+jsonでアクセスする

Fetch APIを使って、次のようにアクセスできるようです。

  const url = "..."; // リクエストするURL
  const callback = json => { /* ... 各行JSONを処理するロジック */ };

  const decoder = new TextDecoder();

  const abortController = new AbortController();
  const { signal } = abortController;

  // fetch開始
  fetch(url, {
    signal,
    headers: {
      Accept: "application/stream+json"
    }
  }).then(response => {
    let buffer = "";
    /**
     * ストリームから読み込まれた部分文字列を処理します
     * @param {string} chunk 読み込まれた文字列
     * @returns {void}
     */
    function consumeChunk(chunk) {
      buffer += chunk;

      // 改行コードで分割し、各JSON行を取り出します
      // https://en.wikipedia.org/wiki/JSON_streaming#Line-delimited_JSON
      // http://jsonlines.org/
      const re = /(.*?)(\r\n|\r|\n)/g;
      let result;
      let lastIndex = 0;
      while ((result = re.exec(buffer))) {
        const data = result[1];
        if (data.trim()) {
          callback(JSON.parse(data)); // 処理するcallbackにJSONを渡す。
        }
        ({ lastIndex } = re);
      }
      // 改行コードが存在しなければ、残りの文字列は次のreadと結合して処理します
      buffer = buffer.slice(lastIndex);
    }
    // ストリームのReaderを作成
    const reader = response.body.getReader();
    // 読み込み処理本体
    function readNext() {
      return reader.read().then(({ done, value }) => {
        if (!done) {
          // --読み込み処理--
          consumeChunk(decoder.decode(value));
          return readNext();
        }

        // --終了処理--
        if (buffer.trim()) {
          // 最後に残った文字列があれば処理します。
          // 最後の行データの後ろに改行コードが含まれていなければここに到達します。
          // Spring WebFluxでは起きないようですが、 http://jsonlines.org/ を見ると最後の改行コードがない場合もある印象だったので念のため実装します。
          // `consumeChunk` は改行コードを渡さないと行を認識しないので改行コードを渡します。
          consumeChunk("\n");
        }

        return response;
      });
    }
    return readNext();
  });
}

以降で何をしてるか説明していきます。

AbortController

これはFluxの取得とはあまり関係ありませんが、fetchを途中でキャンセルしたいなら必要です。

  const abortController = new AbortController();
  const { signal } = abortController;

  // fetch開始
  fetch(url, {
    signal,
    headers: {
      Accept: "application/stream+json"
    }
  })

fetchのオプションにsignalを渡しておくと。

abortController.abort()

とするとfetchを途中でやめることができます。
この際、JavaのController側でも、キャンセルされたら処理を中断するようにしておくとさらに良さそうです。

    @GetMapping
    public Flux<Hoge> all() {
        return Flux.create(sink -> {
            // ...
            if (sink.isCancelled()) {
                // キャンセルされているので処理を中断しましょう。
            }
            // ...
        })
    }

onCancelを使う方法もあります。

responseをStreamで受ける

Readable​Stream​.get​Reader() | MDNを参考にします。

次のように、ReadableStreamを使うと、レスポンスをStreamで受けられるようです。

  fetch(...).then(response => {
    // ...
    // ストリームのReaderを作成
    const reader = response.body.getReader();
    function readNext() {
      return reader.read().then(({ done, value }) => {
        if (!done) {
          /* valueを処理する */
          // ...
          return readNext();
        }

        return; /* 終了 */
      });
    }
    return readNext();
  })

valueをdecodeする

ReadableStreamから取得されたvalueUint8Arrayでやってくるようなので、TextDecoderを使ってdecodeします。

const decoder = new TextDecoder();

// ...

const sValue = decoder.decode(value) // Uint8Arrayをdecodeしてstringに変換

chunkを処理する

ここまでで、Streamの中身を文字列で取得するところまで出来ました。
ここで実際にJSONをparseしていきます。

    let buffer = "";
    /**
     * ストリームから読み込まれた部分文字列を処理します
     * @param {string} chunk 読み込まれた文字列
     * @returns {void}
     */
    function consumeChunk(chunk) {
      buffer += chunk;

      // 改行コードで分割し、各JSON行を取り出します
      // https://en.wikipedia.org/wiki/JSON_streaming#Line-delimited_JSON
      // http://jsonlines.org/
      const re = /(.*?)(\r\n|\r|\n)/g;
      let result;
      let lastIndex = 0;
      while ((result = re.exec(buffer))) {
        const data = result[1];
        if (data.trim()) {
          callback(JSON.parse(data)); // 処理するcallbackにJSONを渡す。
        }
        ({ lastIndex } = re);
      }
      // 改行コードが存在しなければ、残りの文字列は次のreadと結合して処理します
      buffer = buffer.slice(lastIndex);
    }

何やらごにょごにょやっていますが、以降で説明します。

改行コードでJSONを分割する

application/stream+jsonは先で書いたように次のようなレスポンスが返ってきます。

{"key":"value" ... }
{"key":"value" ... }
{"key":"value" ... }

このレスポンスは、改行コードで区切られたJSONです。
なので、改行コードで区切って、各行JSONをJSON.parseしています。

chunkを結合する

ReadableStreamからread()で取得した文字列は、完全なJSONではありません。あくまでレスポンス全体の一部分です。

なので、chunkに改行コードが含まれない場合は、次のchunkと結合して処理するようにしています。


説明は以上です。

感想

EventSourceを使った方法は、ヘッダー情報が渡せなかったりして、組み込もうとしているアプリケーションによっては、工夫や変更が必要になるかもしれません。
(例えば、認証トークンをヘッダーに載せている前提でサーバーサイドの処理をしている場合。とか)

しかし、このFetch APIを使う方法であれば、普通のHTTPアクセスとほとんど一緒なので、(Fetch APIに対応したブラウザであれば)気軽に使うことができる印象でした。

14
11
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
14
11