Java
spring
JavaEE
spring-boot
spring-mvc

Spring MVC(+Spring Boot)上での非同期リクエストを理解する -後編(HTTP Streaming)-

More than 1 year has passed since last update.

今回は、前回の投稿から引き続き、Spring MVCベースのWebアプリケーションでServlet 3.0からサポートされた非同期処理を利用する方法の説明を行います。前回は「非同期実行が終了してからHTTPレスポンスを開始する方式」について説明しましたが、今回は「非同期実行の処理中にHTTPレスポンスを開始する方式」について説明します。Spring MVCがサポートしている方式については、こちらをご覧ください。

動作確認環境

  • Java SE 8
  • Tomcat 8.5.5 (Servlet 3.1)
  • Spring Framework 4.3.3.RELESAE
  • Spring Boot 1.4.1.RELEASE

前提知識

Spring MVCでPush型の非同期処理を実装してみる

非同期実行の処理中にHTTPレスポンスを開始する方式は、ひとつのHTTPレスポンスを複数のイベントに分割して返却(Push)します。これは、"HTTP Streaming"として知られているレスポンス方式です。まずは、Spring MVCでPush型の非同期処理を実装してみましょう。

package com.example.component;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter;

import java.io.IOException;

@Controller
@RequestMapping("/streaming")
public class StreamingController {

    @Autowired
    AsyncHelper asyncHelper;

    @RequestMapping(method = RequestMethod.GET)
    public ResponseBodyEmitter streaming(@RequestParam(defaultValue = "1") long eventNumber, @RequestParam(defaultValue = "0") long intervalSec) throws IOException {
        Console.println("Start get.");

        ResponseBodyEmitter emitter = new ResponseBodyEmitter();
        asyncHelper.streaming(emitter, eventNumber, intervalSec);

        Console.println("End get.");
        return emitter;
    }

}
@Component
public class AsyncHelper {
    // ...
    @Async
    public void streaming(ResponseBodyEmitter emitter, long eventNumber, long intervalSec) throws IOException {
        Console.println("Start Async processing.");

        for (long i = 1; i <= eventNumber; i++) {
            sleep(intervalSec);
            emitter.send("msg" + i + "\r\n");
        }

        emitter.complete();

        Console.println("End Async processing.");
    }
    // ...
}

cURLやブラウザを使用してストリーミング処理にアクセスします。以下のパラメータでアクセスすると、1秒間隔で2回のイベント(メッセージ)をクライアント側にPushします。

$ curl -D - http://localhost:8080/streaming?eventNumber=2\&intervalSec=1
コンソール
HTTP/1.1 200 
Transfer-Encoding: chunked
Date: Wed, 05 Oct 2016 16:00:22 GMT

msg1
msg2

テキストでは表現できませんが、「(1秒待機) -> msg1 -> (1秒待機) -> msg2」といった感じでコンソールに表示されます。

Spring MVCのPush型非同期処理を紐解く

基本的な仕組みは「非同期実行が終了してからHTTPレスポンスを開始する方式」と同じですが、非同期処理中にクライアントにレスポンスを開始する点が異なります。絵にすると以下のような感じです。

spring-mvc-async-streaming.png

ポイントは、Controllerの処理が終了した後にレスポンスをいったんフラッシュすることで、分割レスポンス(Transfer-Encoding: chunked)になることをクライアントに通知している点です。以降の処理では、クライアントはレスポンスデータが分割して送られてくることを前提に処理を行う必要があります。

絵には表現していませんが、クライアントにPushするデータはHttpMessageConverterによって変換される仕組みになっています。HttpMessageConverterを利用すると、Jacksonを使用してJavaBeanをJSONに変換してからクライアントにデータをPushすることができます。

SseEmitterの利用

ResponseBodyEmitterの仕組みを利用してServer-Sent Events(SSE)のサーバー実装を行ってみましょう。Spring MVCは、SSEの実装をサポートするクラスとしてSseEmitterというクラスを提供しています。SseEmitterResponseBodyEmitterのサブクラスで、HTTPレスポンスのコンテンツタイプ(text/event-stream)とデータ形式を拡張しています。

package com.example.component;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.io.IOException;

@Controller
@RequestMapping("/sse")
public class SseController {

    @Autowired
    AsyncHelper asyncHelper;

    @RequestMapping(method = RequestMethod.GET)
    public SseEmitter sse(@RequestParam(defaultValue = "1") long eventNumber, @RequestParam(defaultValue = "0") long intervalSec) throws IOException {
        Console.println("Start get.");

        SseEmitter emitter = new SseEmitter();
        asyncHelper.sse(emitter, eventNumber, intervalSec);

        Console.println("End get.");
        return emitter;
    }

}
@Component
public class AsyncHelper {
    // ...
    @Async
    public void sse(SseEmitter emitter, long eventNumber, long intervalSec) throws IOException {
        Console.println("Start Async processing.");

        for (long i = 1; i <= eventNumber; i++) {
            sleep(intervalSec);
            emitter.send("msg" + i);
        }

        emitter.complete();

        Console.println("End Async processing.");
    }
    // ...
}

cURLやブラウザを使用してSSE処理にアクセスします。以下のパラメータでアクセスすると、1秒間隔で2回のSSEイベントをクライアント側にPushします。

$ curl -D - http://localhost:8080/sse?eventNumber=2\&intervalSec=1
コンソール
HTTP/1.1 200 
Content-Type: text/event-stream;charset=UTF-8
Transfer-Encoding: chunked
Date: Wed, 05 Oct 2016 16:30:25 GMT

data:msg1

data:msg2

イベントストリームのフィールドの指定

SSEのイベントストリームにはdata以外にもいくつかのフィールドが定義されています。それらのフィールドに値を指定する場合は、SseEventBuilderを使います。SseEventBuilderのインスタンスは、SseEmitter#eventメソッドを呼び出して取得することができます。

emitter.send(SseEmitter.event()
        .comment("This is test event")
        .id(UUID.randomUUID().toString())
        .name("onlog")
        .reconnectTime(3000)
        .data("msg" + i));
コンソール
HTTP/1.1 200 
Content-Type: text/event-stream;charset=UTF-8
Transfer-Encoding: chunked
Date: Wed, 05 Oct 2016 16:31:35 GMT

:This is test event
id:c62ae77f-85fe-41ac-bfff-c03b0020a816
event:onlog
retry:3000
data:msg1

:This is test event
id:d283757e-9d67-4be5-b858-3c6b543c7b5d
event:onlog
retry:3000
data:msg2

タイムアウト値の指定

デフォルトのタイムアウト値は、前回紹介したとおりSpring MVC側の設定で行うことができます。ここでは処理毎にタイムアウト値を指定する方法を紹介します。

@RequestMapping(method = RequestMethod.GET)
public SseEmitter sse(@RequestParam(defaultValue = "1") long eventNumber, @RequestParam(defaultValue = "0") long intervalSec) throws IOException {
    // ....
    SseEmitter emitter = new SseEmitter(0L);
    // ...
    return emitter;
}

ResponseBodyEmitterもコンストラクタで指定できます。

タイムアウト時の動作

タイムアウトになるようにリクエストを送ると、以下のようなレスポンスが返却されます。

$ curl -D - http://localhost:8080/sse?eventNumber=2\&intervalSec=2
コンソール
HTTP/1.1 200 
Content-Type: text/event-stream;charset=UTF-8
Transfer-Encoding: chunked
Date: Wed, 05 Oct 2016 16:35:27 GMT

:This is test event
id:ff6799cc-0a35-4bde-95b8-5bc1b621abcf
event:onlog
retry:3000
data:msg1

{"timestamp":"2016-10-05T16:35:31.060+0000","status":200,"error":"OK","exception":"org.springframework.web.context.request.async.AsyncRequestTimeoutException","message":"No message available","path":"/sse"}

タイムアウトになるとAsyncRequestTimeoutExceptionが発生する仕組みになっているので、Spring MVCの動作としては正しそうですが、レスポンスがイベントストリームではなくJSONになってしまっています。ここではイベントストリームとしてタイムアウトを通知するようにしてみます。

タイムアウトをイベントストリームに通知

ControllerクラスでAsyncRequestTimeoutExceptionの例外ハンドリングを実装します。ここでは、SseEventBuilderを使ってイベントストリームのデータをレスポンスしています。

@Controller
@RequestMapping("/sse")
public class SseController {
    private static final Logger logger = LoggerFactory.getLogger(SseController.class);
    // ...
    @ExceptionHandler
    @ResponseBody
    public String handleAsyncRequestTimeoutException(AsyncRequestTimeoutException e) {
        logger.error("timeout error is occurred.", e);
        return SseEmitter.event().data("timeout!!").build().stream()
                .map(d -> d.getData().toString())
                .collect(Collectors.joining());
    }
}

タイムアウトを発生させると、タイムアウトを通知するイベントがレスポンスされました。

コンソール
HTTP/1.1 200 
Content-Type: text/event-stream;charset=UTF-8
Transfer-Encoding: chunked
Date: Wed, 05 Oct 2016 16:46:31 GMT

data:msg1

data:timeout!!

例外ハンドリング

非同期処理の中で例外が発生すると、タイムアウトが検知されるまでクライアントとの通信が終了しません。ためしに非同期処理の中で例外を発生させてみましょう。

@Component
public class AsyncHelper {
    // ...
    @Async
    public void sse(SseEmitter emitter, long eventNumber, long intervalSec) throws IOException {
        Console.println("Start Async processing.");

        if (intervalSec == 999) {
            throw new IllegalStateException("Special parameter for confirm error.");
        }
        // ...
    }
    // ...
}
$ curl -D - http://localhost:8080/sse?eventNumber=2\&intervalSec=999
コンソール
HTTP/1.1 200 
Content-Type: text/event-stream;charset=UTF-8
Transfer-Encoding: chunked
Date: Wed, 05 Oct 2016 16:48:46 GMT

data:timeout!!

これは、非同期処理の中で例外ハンドリングを行う必要があることを意味します。なお、例外をハンドリングした後の処理としては、以下の2つの方法があります。

  • sendメソッドを使って個別にエラーを通知する
  • Spring MVCのExceptionResolverの仕組みと連携してエラーを通知する

個別にエラーを通知

個別にエラーを通知する場合は、以下のように例外をキャッチし、sendメソッドを使ってエラーを通知します。

try {
    // ...
} catch (Exception e) {
    logger.error("system error is occurred."e);
    emitter.send("error !!");
    emitter.complete();
    return;
}
コンソール
HTTP/1.1 200 
Content-Type: text/event-stream;charset=UTF-8
Transfer-Encoding: chunked
Date: Wed, 05 Oct 2016 16:51:38 GMT

data:error !!

ExceptionResolverの仕組みと連携してエラーを通知

ExceptionResolverの仕組みと連携する場合は、以下のように例外をキャッチし、completeWithErrorメソッドを使ってエラーを通知します。

try {
    // ...
} catch (Exception e) {
    emitter.completeWithError(e);
    return;
}

completeWithErrorメソッドに例外を設定すると、Spring MVCのExceptionResolverが呼び出されるので、Controllerクラスで例外ハンドリングを実装します。ここでは、SseEventBuilderを使ってイベントストリームのデータをレスポンスしています。

@Controller
@RequestMapping("/sse")
public class SseController {
    private static final Logger logger = LoggerFactory.getLogger(SseController.class);
    // ...
    @ExceptionHandler
    @ResponseBody
    public String handleException(Exception e) {
        logger.error("system error is occurred.", e);
        return SseEmitter.event().data("error !!").build().stream()
                .map(d -> d.getData().toString())
                .collect(Collectors.joining());
    }
}
コンソール
HTTP/1.1 200 OK
Server: Apache-Coyote/1.1
Content-Type: text/event-stream;charset=UTF-8
Transfer-Encoding: chunked
Date: Sun, 22 May 2016 18:30:11 GMT

data:error !!

StreamingResponseBodyの利用

Spring MVCは、HTTPレスポンスのOutputStreamに直接データを出力するための仕組みも提供しています。この仕組みは、前回紹介したCallable利用時と同様に、Spring MVC管理下のスレッドを使って実行します。

@Controller
@RequestMapping("/streaming")
public class StreamingController {

    @RequestMapping(path = "direct", method = RequestMethod.GET)
    public StreamingResponseBody directStreaming(@RequestParam(defaultValue = "1") long eventNumber, @RequestParam(defaultValue = "0") long intervalSec) throws IOException {
        Console.println("Start get.");

        // StreamingResponseBodyのwriteToメソッドの中に非同期処理を実装する
        // StreamingResponseBodyは関数型インターフェースなのでJava SE 8+ならラムダ式が使えます。
        StreamingResponseBody responseBody = outputStream -> {
            Console.println("Start Async processing.");

            if (intervalSec == 999) {
                throw new IllegalStateException("Special parameter for confirm error.");
            }

            for (long i = 1; i <= eventNumber; i++) {
                try {
                    TimeUnit.SECONDS.sleep(intervalSec);
                } catch (InterruptedException e) {
                    Thread.interrupted();
                }
                outputStream.write(("msg" + i + "\r\n").getBytes());
                outputStream.flush();
            }

            Console.println("End Async processing.");
        };

        Console.println("End get.");
        return responseBody;
    }
}
$ curl -D - http://localhost:8080/streaming/direct?eventNumber=2\&intervalSec=1
コンソール
HTTP/1.1 200 
Transfer-Encoding: chunked
Date: Wed, 05 Oct 2016 16:53:43 GMT

msg1
msg2

タイムアウトのハンドリング

タイムアウトになるようにリクエストを送ると、以下のようなレスポンスが返却されます。

$ curl -D - http://localhost:8080/streaming/direct?eventNumber=2\&intervalSec=2
コンソール
HTTP/1.1 200 
Transfer-Encoding: chunked
Date: Wed, 05 Oct 2016 16:54:40 GMT

msg1
{"timestamp":"2016-10-05T16:54:41.902+0000","status":200,"error":"OK","exception":"org.springframework.web.context.request.async.AsyncRequestTimeoutException","message":"No message available","path":"/streaming/direct"}

エラー時のレスポンスをカスタマイズしたい場合は、以下のように@ExceptionHandlerを使用してAsyncRequestTimeoutExceptionを半おリングします。

@Controller
@RequestMapping("/streaming")
public class StreamingController {
    private static final Logger logger = LoggerFactory.getLogger(StreamingController.class);
    // ...
    @ExceptionHandler
    @ResponseStatus(HttpStatus.SERVICE_UNAVAILABLE)
    @ResponseBody
    public String handleAsyncRequestTimeoutException(AsyncRequestTimeoutException e) {
        logger.error("Timeout error is occurred.", e);
        return "timeout!!";
    }
    // ...
}

タイムアウトを発生させると、タイムアウトを通知するメッセージがレスポンスされました。

コンソール
HTTP/1.1 200 
Transfer-Encoding: chunked
Date: Wed, 05 Oct 2016 17:01:53 GMT

msg1
timeout!!

例外ハンドリング

非同期処理内で発生した例外は、Spring MVCのExceptionResolverでハンドリングされるため、StreamingResponseBody独自の実装は不要です。ここでは、@ExceptionHandlerメソッドを使用してハンドリングする方法を紹介します。

@Controller
@RequestMapping("/streaming")
public class StreamingController {
    private static final Logger logger = LoggerFactory.getLogger(StreamingController.class);
    // ...
    @ExceptionHandler
    @ResponseStatus(HttpStatus.INTERNAL_SERVER_ERROR)
    @ResponseBody
    public String handleException(Exception e) {
        logger.error("System error is occurred.", e);
        return "error!!";
    }
}

例外を発生させると、エラーを通知するメッセージがレスポンスされました。

コンソール
HTTP/1.1 500 
Content-Type: text/plain;charset=UTF-8
Content-Length: 7
Date: Wed, 05 Oct 2016 17:06:50 GMT
Connection: close

error!!

Spring Boot上での実装

Spring Boot独自の仕組みはありません。(これまで説明してきた内容は、Spring Boot上で動かしていますw)

まとめ

2回にわけてSpring MVCベースのWebアプリケーションでServlet 3.0からサポートされた非同期処理を利用する方法について説明しました。
なお、今回紹介した方式は、サーバーから任意のタイミングでデータを送信(Push)したい場合に利用する方式です。サーバーから任意のタイミングでデータを送信する方法として有名なのがWebSokcetです。JavaでもJava EE 7からWebSokcetがサポートされており、TomcatなどのAPサーバー上でも利用することができます。SpringもWebScoketとの連携機能を提供しており、WebSokcetアプリをSpringスタイルで開発することができます。Spring上でのWebSokcetの開発方法については、どこかのタイミングで紹介したいと思いますが、たぶんもうちょっと先になるでしょう・・・ :sweat_smile:

参考サイト