今回は、前回の投稿から引き続き、Spring MVCベースのWebアプリケーションでServlet 3.0からサポートされた非同期処理を利用する方法の説明を行います。前回は「非同期実行が終了してからHTTPレスポンスを開始する方式」について説明しましたが、今回は「非同期実行の処理中にHTTPレスポンスを開始する方式」について説明します。Spring MVCがサポートしている方式については、こちらをご覧ください。
動作確認環境
- Java SE 8
- Tomcat 8.5.5 (Servlet 3.1)
- Spring Framework 4.3.3.RELESAE
- Spring Boot 1.4.1.RELEASE
前提知識
Spring MVCでPush型の非同期処理を実装してみる
非同期実行の処理中にHTTPレスポンスを開始する方式は、ひとつのHTTPレスポンスを複数のイベントに分割して返却(Push)します。これは、"HTTP Streaming"として知られているレスポンス方式です。まずは、Spring MVCでPush型の非同期処理を実装してみましょう。
package com.example.component;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter;
import java.io.IOException;
@Controller
@RequestMapping("/streaming")
public class StreamingController {
@Autowired
AsyncHelper asyncHelper;
@RequestMapping(method = RequestMethod.GET)
public ResponseBodyEmitter streaming(@RequestParam(defaultValue = "1") long eventNumber, @RequestParam(defaultValue = "0") long intervalSec) throws IOException {
Console.println("Start get.");
ResponseBodyEmitter emitter = new ResponseBodyEmitter();
asyncHelper.streaming(emitter, eventNumber, intervalSec);
Console.println("End get.");
return emitter;
}
}
@Component
public class AsyncHelper {
// ...
@Async
public void streaming(ResponseBodyEmitter emitter, long eventNumber, long intervalSec) throws IOException {
Console.println("Start Async processing.");
for (long i = 1; i <= eventNumber; i++) {
sleep(intervalSec);
emitter.send("msg" + i + "\r\n");
}
emitter.complete();
Console.println("End Async processing.");
}
// ...
}
cURLやブラウザを使用してストリーミング処理にアクセスします。以下のパラメータでアクセスすると、1秒間隔で2回のイベント(メッセージ)をクライアント側にPushします。
$ curl -D - http://localhost:8080/streaming?eventNumber=2\&intervalSec=1
HTTP/1.1 200
Transfer-Encoding: chunked
Date: Wed, 05 Oct 2016 16:00:22 GMT
msg1
msg2
テキストでは表現できませんが、「(1秒待機) -> msg1
-> (1秒待機) -> msg2
」といった感じでコンソールに表示されます。
Spring MVCのPush型非同期処理を紐解く
基本的な仕組みは「非同期実行が終了してからHTTPレスポンスを開始する方式」と同じですが、非同期処理中にクライアントにレスポンスを開始する点が異なります。絵にすると以下のような感じです。
ポイントは、Controllerの処理が終了した後にレスポンスをいったんフラッシュすることで、分割レスポンス(Transfer-Encoding: chunked
)になることをクライアントに通知している点です。以降の処理では、クライアントはレスポンスデータが分割して送られてくることを前提に処理を行う必要があります。
絵には表現していませんが、クライアントにPushするデータはHttpMessageConverter
によって変換される仕組みになっています。HttpMessageConverter
を利用すると、Jacksonを使用してJavaBeanをJSONに変換してからクライアントにデータをPushすることができます。
SseEmitterの利用
ResponseBodyEmitter
の仕組みを利用してServer-Sent Events(SSE)のサーバー実装を行ってみましょう。Spring MVCは、SSEの実装をサポートするクラスとしてSseEmitter
というクラスを提供しています。SseEmitter
はResponseBodyEmitter
のサブクラスで、HTTPレスポンスのコンテンツタイプ(text/event-stream
)とデータ形式を拡張しています。
package com.example.component;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
@Controller
@RequestMapping("/sse")
public class SseController {
@Autowired
AsyncHelper asyncHelper;
@RequestMapping(method = RequestMethod.GET)
public SseEmitter sse(@RequestParam(defaultValue = "1") long eventNumber, @RequestParam(defaultValue = "0") long intervalSec) throws IOException {
Console.println("Start get.");
SseEmitter emitter = new SseEmitter();
asyncHelper.sse(emitter, eventNumber, intervalSec);
Console.println("End get.");
return emitter;
}
}
@Component
public class AsyncHelper {
// ...
@Async
public void sse(SseEmitter emitter, long eventNumber, long intervalSec) throws IOException {
Console.println("Start Async processing.");
for (long i = 1; i <= eventNumber; i++) {
sleep(intervalSec);
emitter.send("msg" + i);
}
emitter.complete();
Console.println("End Async processing.");
}
// ...
}
cURLやブラウザを使用してSSE処理にアクセスします。以下のパラメータでアクセスすると、1秒間隔で2回のSSEイベントをクライアント側にPushします。
$ curl -D - http://localhost:8080/sse?eventNumber=2\&intervalSec=1
HTTP/1.1 200
Content-Type: text/event-stream;charset=UTF-8
Transfer-Encoding: chunked
Date: Wed, 05 Oct 2016 16:30:25 GMT
data:msg1
data:msg2
イベントストリームのフィールドの指定
SSEのイベントストリームにはdata
以外にもいくつかのフィールドが定義されています。それらのフィールドに値を指定する場合は、SseEventBuilder
を使います。SseEventBuilder
のインスタンスは、SseEmitter#event
メソッドを呼び出して取得することができます。
emitter.send(SseEmitter.event()
.comment("This is test event")
.id(UUID.randomUUID().toString())
.name("onlog")
.reconnectTime(3000)
.data("msg" + i));
HTTP/1.1 200
Content-Type: text/event-stream;charset=UTF-8
Transfer-Encoding: chunked
Date: Wed, 05 Oct 2016 16:31:35 GMT
:This is test event
id:c62ae77f-85fe-41ac-bfff-c03b0020a816
event:onlog
retry:3000
data:msg1
:This is test event
id:d283757e-9d67-4be5-b858-3c6b543c7b5d
event:onlog
retry:3000
data:msg2
タイムアウト値の指定
デフォルトのタイムアウト値は、前回紹介したとおりSpring MVC側の設定で行うことができます。ここでは処理毎にタイムアウト値を指定する方法を紹介します。
@RequestMapping(method = RequestMethod.GET)
public SseEmitter sse(@RequestParam(defaultValue = "1") long eventNumber, @RequestParam(defaultValue = "0") long intervalSec) throws IOException {
// ....
SseEmitter emitter = new SseEmitter(0L);
// ...
return emitter;
}
ResponseBodyEmitter
もコンストラクタで指定できます。
タイムアウト時の動作
タイムアウトになるようにリクエストを送ると、以下のようなレスポンスが返却されます。
$ curl -D - http://localhost:8080/sse?eventNumber=2\&intervalSec=2
HTTP/1.1 200
Content-Type: text/event-stream;charset=UTF-8
Transfer-Encoding: chunked
Date: Wed, 05 Oct 2016 16:35:27 GMT
:This is test event
id:ff6799cc-0a35-4bde-95b8-5bc1b621abcf
event:onlog
retry:3000
data:msg1
{"timestamp":"2016-10-05T16:35:31.060+0000","status":200,"error":"OK","exception":"org.springframework.web.context.request.async.AsyncRequestTimeoutException","message":"No message available","path":"/sse"}
タイムアウトになるとAsyncRequestTimeoutException
が発生する仕組みになっているので、Spring MVCの動作としては正しそうですが、レスポンスがイベントストリームではなくJSONになってしまっています。ここではイベントストリームとしてタイムアウトを通知するようにしてみます。
タイムアウトをイベントストリームに通知
ControllerクラスでAsyncRequestTimeoutException
の例外ハンドリングを実装します。ここでは、SseEventBuilder
を使ってイベントストリームのデータをレスポンスしています。
@Controller
@RequestMapping("/sse")
public class SseController {
private static final Logger logger = LoggerFactory.getLogger(SseController.class);
// ...
@ExceptionHandler
@ResponseBody
public String handleAsyncRequestTimeoutException(AsyncRequestTimeoutException e) {
logger.error("timeout error is occurred.", e);
return SseEmitter.event().data("timeout!!").build().stream()
.map(d -> d.getData().toString())
.collect(Collectors.joining());
}
}
タイムアウトを発生させると、タイムアウトを通知するイベントがレスポンスされました。
HTTP/1.1 200
Content-Type: text/event-stream;charset=UTF-8
Transfer-Encoding: chunked
Date: Wed, 05 Oct 2016 16:46:31 GMT
data:msg1
data:timeout!!
例外ハンドリング
非同期処理の中で例外が発生すると、タイムアウトが検知されるまでクライアントとの通信が終了しません。ためしに非同期処理の中で例外を発生させてみましょう。
@Component
public class AsyncHelper {
// ...
@Async
public void sse(SseEmitter emitter, long eventNumber, long intervalSec) throws IOException {
Console.println("Start Async processing.");
if (intervalSec == 999) {
throw new IllegalStateException("Special parameter for confirm error.");
}
// ...
}
// ...
}
$ curl -D - http://localhost:8080/sse?eventNumber=2\&intervalSec=999
HTTP/1.1 200
Content-Type: text/event-stream;charset=UTF-8
Transfer-Encoding: chunked
Date: Wed, 05 Oct 2016 16:48:46 GMT
data:timeout!!
これは、非同期処理の中で例外ハンドリングを行う必要があることを意味します。なお、例外をハンドリングした後の処理としては、以下の2つの方法があります。
-
send
メソッドを使って個別にエラーを通知する - Spring MVCの
ExceptionResolver
の仕組みと連携してエラーを通知する
個別にエラーを通知
個別にエラーを通知する場合は、以下のように例外をキャッチし、send
メソッドを使ってエラーを通知します。
try {
// ...
} catch (Exception e) {
logger.error("system error is occurred."e);
emitter.send("error !!");
emitter.complete();
return;
}
HTTP/1.1 200
Content-Type: text/event-stream;charset=UTF-8
Transfer-Encoding: chunked
Date: Wed, 05 Oct 2016 16:51:38 GMT
data:error !!
ExceptionResolverの仕組みと連携してエラーを通知
ExceptionResolver
の仕組みと連携する場合は、以下のように例外をキャッチし、completeWithError
メソッドを使ってエラーを通知します。
try {
// ...
} catch (Exception e) {
emitter.completeWithError(e);
return;
}
completeWithError
メソッドに例外を設定すると、Spring MVCのExceptionResolver
が呼び出されるので、Controllerクラスで例外ハンドリングを実装します。ここでは、SseEventBuilder
を使ってイベントストリームのデータをレスポンスしています。
@Controller
@RequestMapping("/sse")
public class SseController {
private static final Logger logger = LoggerFactory.getLogger(SseController.class);
// ...
@ExceptionHandler
@ResponseBody
public String handleException(Exception e) {
logger.error("system error is occurred.", e);
return SseEmitter.event().data("error !!").build().stream()
.map(d -> d.getData().toString())
.collect(Collectors.joining());
}
}
HTTP/1.1 200 OK
Server: Apache-Coyote/1.1
Content-Type: text/event-stream;charset=UTF-8
Transfer-Encoding: chunked
Date: Sun, 22 May 2016 18:30:11 GMT
data:error !!
StreamingResponseBodyの利用
Spring MVCは、HTTPレスポンスのOutputStream
に直接データを出力するための仕組みも提供しています。この仕組みは、前回紹介したCallable
利用時と同様に、Spring MVC管理下のスレッドを使って実行します。
@Controller
@RequestMapping("/streaming")
public class StreamingController {
@RequestMapping(path = "direct", method = RequestMethod.GET)
public StreamingResponseBody directStreaming(@RequestParam(defaultValue = "1") long eventNumber, @RequestParam(defaultValue = "0") long intervalSec) throws IOException {
Console.println("Start get.");
// StreamingResponseBodyのwriteToメソッドの中に非同期処理を実装する
// StreamingResponseBodyは関数型インターフェースなのでJava SE 8+ならラムダ式が使えます。
StreamingResponseBody responseBody = outputStream -> {
Console.println("Start Async processing.");
if (intervalSec == 999) {
throw new IllegalStateException("Special parameter for confirm error.");
}
for (long i = 1; i <= eventNumber; i++) {
try {
TimeUnit.SECONDS.sleep(intervalSec);
} catch (InterruptedException e) {
Thread.interrupted();
}
outputStream.write(("msg" + i + "\r\n").getBytes());
outputStream.flush();
}
Console.println("End Async processing.");
};
Console.println("End get.");
return responseBody;
}
}
$ curl -D - http://localhost:8080/streaming/direct?eventNumber=2\&intervalSec=1
HTTP/1.1 200
Transfer-Encoding: chunked
Date: Wed, 05 Oct 2016 16:53:43 GMT
msg1
msg2
タイムアウトのハンドリング
タイムアウトになるようにリクエストを送ると、以下のようなレスポンスが返却されます。
$ curl -D - http://localhost:8080/streaming/direct?eventNumber=2\&intervalSec=2
HTTP/1.1 200
Transfer-Encoding: chunked
Date: Wed, 05 Oct 2016 16:54:40 GMT
msg1
{"timestamp":"2016-10-05T16:54:41.902+0000","status":200,"error":"OK","exception":"org.springframework.web.context.request.async.AsyncRequestTimeoutException","message":"No message available","path":"/streaming/direct"}
エラー時のレスポンスをカスタマイズしたい場合は、以下のように@ExceptionHandler
を使用してAsyncRequestTimeoutException
を半おリングします。
@Controller
@RequestMapping("/streaming")
public class StreamingController {
private static final Logger logger = LoggerFactory.getLogger(StreamingController.class);
// ...
@ExceptionHandler
@ResponseStatus(HttpStatus.SERVICE_UNAVAILABLE)
@ResponseBody
public String handleAsyncRequestTimeoutException(AsyncRequestTimeoutException e) {
logger.error("Timeout error is occurred.", e);
return "timeout!!";
}
// ...
}
タイムアウトを発生させると、タイムアウトを通知するメッセージがレスポンスされました。
HTTP/1.1 200
Transfer-Encoding: chunked
Date: Wed, 05 Oct 2016 17:01:53 GMT
msg1
timeout!!
例外ハンドリング
非同期処理内で発生した例外は、Spring MVCのExceptionResolver
でハンドリングされるため、StreamingResponseBody
独自の実装は不要です。ここでは、@ExceptionHandler
メソッドを使用してハンドリングする方法を紹介します。
@Controller
@RequestMapping("/streaming")
public class StreamingController {
private static final Logger logger = LoggerFactory.getLogger(StreamingController.class);
// ...
@ExceptionHandler
@ResponseStatus(HttpStatus.INTERNAL_SERVER_ERROR)
@ResponseBody
public String handleException(Exception e) {
logger.error("System error is occurred.", e);
return "error!!";
}
}
例外を発生させると、エラーを通知するメッセージがレスポンスされました。
HTTP/1.1 500
Content-Type: text/plain;charset=UTF-8
Content-Length: 7
Date: Wed, 05 Oct 2016 17:06:50 GMT
Connection: close
error!!
Spring Boot上での実装
Spring Boot独自の仕組みはありません。(これまで説明してきた内容は、Spring Boot上で動かしていますw)
まとめ
2回にわけてSpring MVCベースのWebアプリケーションでServlet 3.0からサポートされた非同期処理を利用する方法について説明しました。
なお、今回紹介した方式は、サーバーから任意のタイミングでデータを送信(Push)したい場合に利用する方式です。サーバーから任意のタイミングでデータを送信する方法として有名なのがWebSokcetです。JavaでもJava EE 7からWebSokcetがサポートされており、TomcatなどのAPサーバー上でも利用することができます。SpringもWebScoketとの連携機能を提供しており、WebSokcetアプリをSpringスタイルで開発することができます。Spring上でのWebSokcetの開発方法については、どこかのタイミングで紹介したいと思いますが、たぶんもうちょっと先になるでしょう・・・