infinite stream
accept header
- text/event-stream
- application/stream+json
back pressure
- request(n)
- write, flush
- repeat
Each runtime is adapted to a reactive ServerHttpRequest and ServerHttpResponse exposing the body of the request and response as Flux, rather than InputStream and OutputStream, with reactive backpressure.
Annotation-based Programming Model
The same @Controller programming model and the same annotations used in Spring MVC are also supported in WebFlux. The main difference is that the underlying core, framework contracts — i.e. HandlerMapping, HandlerAdapter, are non-blocking and operate on the reactive ServerHttpRequest and ServerHttpResponse rather than on the HttpServletRequest and HttpServletResponse.
Reactive types from Reactor 3, RxJava 2, RxJava 1 or others registered through the configured ReactiveAdapterRegistry can be returned as an alternative equivalent to using DeferredResult for single-valued types, or ResponseBodyEmitter and SseEmitter for multi-valued reactive types where a streaming media type (e.g. "text/event-stream", "application/json+stream") is requested.
sample code
@RestController
public class CarLocationController {
private final CarRepository repository;
public CarLocationController(CarRepository repository) {
this.repository = repository;
}
@GetMapping("/cars")
public Flux<Car> getCars() {
return this.repository.findAll().log();
}
...
}
$ curl -v -H "Accept:text/event-stream" http://localhost:8081/cars
* Trying ::1...
* TCP_NODELAY set
* Connected to localhost (::1) port 8081 (#0)
> GET /cars HTTP/1.1
> Host: localhost:8081
> User-Agent: curl/7.51.0
> Accept:text/event-stream
>
< HTTP/1.1 200
< Content-Type: text/event-stream;charset=UTF-8
< Transfer-Encoding: chunked
< Date: Tue, 12 Sep 2017 15:52:22 GMT
<
data:{"id":1,"location":{"longitude":40.740984,"latitude":-73.987904}}
data:{"id":2,"location":{"longitude":40.740940,"latitude":-73.988088}}
data:{"id":3,"location":{"longitude":40.740856,"latitude":-73.988072}}
...
data:{"id":98,"location":{"longitude":40.740876,"latitude":-73.987924}}
data:{"id":99,"location":{"longitude":40.740856,"latitude":-73.988056}}
data:{"id":100,"location":{"longitude":40.740852,"latitude":-73.988060}}
* Curl_http_done: called premature == 0
* Connection #0 to host localhost left intact
[restartedMain] 52:45 TomcatWebServer: Tomcat started on port(s): 8081 (http)
...
[http-nio-8081-exec-1] 52:55 DispatcherServlet: Published WebApplicationContext of servlet 'dispatcherServlet' as ServletContext attribute with name [org.springframework.web.servlet.FrameworkServlet.CONTEXT.dispatcherServlet]
[http-nio-8081-exec-1] 52:55 DispatcherServlet: FrameworkServlet 'dispatcherServlet': initialization completed in 27 ms
[http-nio-8081-exec-1] 52:55 DispatcherServlet: Servlet 'dispatcherServlet' configured successfully
[http-nio-8081-exec-1] 52:55 DispatcherServlet: DispatcherServlet with name 'dispatcherServlet' processing GET request for [/cars]
[http-nio-8081-exec-1] 52:55 RequestMappingHandlerMapping: Looking up handler method for path /cars
[http-nio-8081-exec-1] 52:55 RequestMappingHandlerMapping: Returning handler method [public reactor.core.publisher.Flux<car.Car> car.location.CarLocationController.getCars()]
[http-nio-8081-exec-1] 52:55 DispatcherServlet: Last-Modified value for [/cars] is: -1
[http-nio-8081-exec-1] 52:55 1: | onSubscribe([Fuseable] FluxOnAssembly.OnAssemblySubscriber)
[http-nio-8081-exec-1] 52:55 ReactiveTypeHandler: Subscribed to Publisher for SseEmitter@73873328
[http-nio-8081-exec-1] 52:55 1: | request(1)
[Thread-13] 52:55 1: | onNext(Car{id=1, location={40.740984, -73.987904}})
[MvcAsync1] 52:55 1: | request(1)
[MvcAsync1] 52:55 1: | onNext(Car{id=2, location={40.740940, -73.988088}})
[MvcAsync2] 52:56 1: | request(1)
[Thread-14] 52:56 1: | onNext(Car{id=3, location={40.740856, -73.988072}})
...
[MvcAsync97] 52:56 1: | request(1)
[MvcAsync97] 52:56 1: | onNext(Car{id=98, location={40.740876, -73.987924}})
[MvcAsync98] 52:56 1: | request(1)
[Thread-14] 52:56 1: | onNext(Car{id=99, location={40.740856, -73.988056}})
[MvcAsync99] 52:56 1: | request(1)
[MvcAsync99] 52:56 1: | onNext(Car{id=100, location={40.740852, -73.988060}})
[MvcAsync100] 52:56 1: | request(1)
[Thread-14] 52:56 1: | onComplete()
[MvcAsync101] 52:56 ReactiveTypeHandler: Publishing completed for SseEmitter@73873328
[MvcAsync101] 52:56 WebAsyncManager: Concurrent result value [null] - dispatching request to resume processing
[http-nio-8081-exec-2] 52:56 DispatcherServlet: DispatcherServlet with name 'dispatcherServlet' resumed processing GET request for [/cars]
[http-nio-8081-exec-2] 52:56 RequestMappingHandlerMapping: Looking up handler method for path /cars
[http-nio-8081-exec-2] 52:56 RequestMappingHandlerMapping: Returning handler method [public reactor.core.publisher.Flux<car.Car> car.location.CarLocationController.getCars()]
[http-nio-8081-exec-2] 52:56 DispatcherServlet: Last-Modified value for [/cars] is: -1
[http-nio-8081-exec-2] 52:56 RequestMappingHandlerAdapter: Found concurrent result value [null]
[http-nio-8081-exec-2] 52:56 DispatcherServlet: Null ModelAndView returned to DispatcherServlet with name 'dispatcherServlet': assuming HandlerAdapter completed request handling
[http-nio-8081-exec-2] 52:56 DispatcherServlet: Successfully completed request
blocking OutputStream
ResponseBodyEmitterReturnValueHandler
public class ResponseBodyEmitterReturnValueHandler implements HandlerMethodReturnValueHandler {
private static final Log logger = LogFactory.getLog(ResponseBodyEmitterReturnValueHandler.class);
private final List<HttpMessageConverter<?>> messageConverters;
private final ReactiveTypeHandler reactiveHandler;
public ResponseBodyEmitterReturnValueHandler(List<HttpMessageConverter<?>> messageConverters) {
Assert.notEmpty(messageConverters, "HttpMessageConverter List must not be empty");
this.messageConverters = messageConverters;
this.reactiveHandler = new ReactiveTypeHandler();
}
public ResponseBodyEmitterReturnValueHandler(List<HttpMessageConverter<?>> messageConverters, ReactiveAdapterRegistry reactiveRegistry, TaskExecutor executor, ContentNegotiationManager manager) {
Assert.notEmpty(messageConverters, "HttpMessageConverter List must not be empty");
this.messageConverters = messageConverters;
1. this.reactiveHandler = new ReactiveTypeHandler(reactiveRegistry, executor, manager);
}
...
public void handleReturnValue(@Nullable Object returnValue, MethodParameter returnType, ModelAndViewContainer mavContainer, NativeWebRequest webRequest) throws Exception {
if(returnValue == null) {
mavContainer.setRequestHandled(true);
} else {
3. HttpServletResponse response = (HttpServletResponse)webRequest.getNativeResponse(HttpServletResponse.class);
Assert.state(response != null, "No HttpServletResponse");
4. ServletServerHttpResponse outputMessage = new ServletServerHttpResponse(response);
if(returnValue instanceof ResponseEntity) {
...
ResponseBodyEmitter emitter;
if(returnValue instanceof ResponseBodyEmitter) {
emitter = (ResponseBodyEmitter)returnValue;
} else {
5. emitter = this.reactiveHandler.handleValue(returnValue, returnType, mavContainer, webRequest);
}
if(emitter != null) {
emitter.extendResponse(outputMessage);
outputMessage.getBody();
outputMessage.flush();
9. ResponseBodyEmitterReturnValueHandler.StreamingServletServerHttpResponse outputMessage1 = new ResponseBodyEmitterReturnValueHandler.StreamingServletServerHttpResponse(outputMessage);
DeferredResult deferredResult = new DeferredResult(emitter.getTimeout());
WebAsyncUtils.getAsyncManager(webRequest).startDeferredResultProcessing(deferredResult, new Object[]{mavContainer});
11. ResponseBodyEmitterReturnValueHandler.HttpMessageConvertingHandler handler = new ResponseBodyEmitterReturnValueHandler.HttpMessageConvertingHandler(outputMessage1, deferredResult);
12. emitter.initialize(handler);
}
}
}
...
private static class StreamingServletServerHttpResponse implements ServerHttpResponse {
private final ServerHttpResponse delegate;
private final HttpHeaders mutableHeaders = new HttpHeaders();
public StreamingServletServerHttpResponse(ServerHttpResponse delegate) {
10. this.delegate = delegate;
this.mutableHeaders.putAll(delegate.getHeaders());
}
public void setStatusCode(HttpStatus status) {
this.delegate.setStatusCode(status);
}
public HttpHeaders getHeaders() {
return this.mutableHeaders;
}
public OutputStream getBody() throws IOException {
19. return this.delegate.getBody();
}
public void flush() throws IOException {
this.delegate.flush();
}
public void close() {
this.delegate.close();
}
}
...
private class HttpMessageConvertingHandler implements Handler {
private final ServerHttpResponse outputMessage;
private final DeferredResult<?> deferredResult;
public HttpMessageConvertingHandler(ServerHttpResponse var1, DeferredResult<?> outputMessage) {
this.outputMessage = outputMessage;
this.deferredResult = deferredResult;
}
public void send(Object data, @Nullable MediaType mediaType) throws IOException {
15. this.sendInternal(data, mediaType);
}
private <T> void sendInternal(T data, @Nullable MediaType mediaType) throws IOException {
if(ResponseBodyEmitterReturnValueHandler.logger.isTraceEnabled()) {
ResponseBodyEmitterReturnValueHandler.logger.trace("Writing [" + data + "]");
}
Iterator var3 = ResponseBodyEmitterReturnValueHandler.this.messageConverters.iterator();
HttpMessageConverter converter;
do {
if(!var3.hasNext()) {
throw new IllegalArgumentException("No suitable converter for " + data.getClass());
}
converter = (HttpMessageConverter)var3.next();
} while(!converter.canWrite(data.getClass(), mediaType));
16. converter.write(data, mediaType, this.outputMessage);
this.outputMessage.flush();
}
...
ReactiveTypeHandler
class ReactiveTypeHandler {
private static final long STREAMING_TIMEOUT_VALUE = -1L;
private static Log logger = LogFactory.getLog(ReactiveTypeHandler.class);
private final ReactiveAdapterRegistry reactiveRegistry;
private final TaskExecutor taskExecutor;
private final ContentNegotiationManager contentNegotiationManager;
public ReactiveTypeHandler() {
this(new ReactiveAdapterRegistry(), new SyncTaskExecutor(), new ContentNegotiationManager());
}
ReactiveTypeHandler(ReactiveAdapterRegistry registry, TaskExecutor executor, ContentNegotiationManager manager) {
Assert.notNull(registry, "ReactiveAdapterRegistry is required");
Assert.notNull(executor, "TaskExecutor is required");
Assert.notNull(manager, "ContentNegotiationManager is required");
this.reactiveRegistry = registry;
2. this.taskExecutor = executor;
this.contentNegotiationManager = manager;
}
public boolean isReactiveType(Class<?> type) {
return this.reactiveRegistry.hasAdapters() && this.reactiveRegistry.getAdapter(type) != null;
}
@Nullable
public ResponseBodyEmitter handleValue(Object returnValue, MethodParameter returnType, ModelAndViewContainer mav, NativeWebRequest request) throws Exception {
...
if(adapter.isMultiValue()) {
Stream var10000 = mediaTypes.stream();
MediaType var10001 = MediaType.TEXT_EVENT_STREAM;
MediaType.TEXT_EVENT_STREAM.getClass();
if(var10000.anyMatch(var10001::includes) || ServerSentEvent.class.isAssignableFrom(elementClass)) {
6. SseEmitter result2 = new SseEmitter(Long.valueOf(-1L));
7. (new ReactiveTypeHandler.SseEmitterSubscriber(result2, this.taskExecutor)).connect(adapter, returnValue);
return result2;
}
...
}
...
private static class SseEmitterSubscriber extends ReactiveTypeHandler.AbstractEmitterSubscriber {
SseEmitterSubscriber(SseEmitter sseEmitter, TaskExecutor executor) {
7.1. super(sseEmitter, executor);
}
protected void send(Object element) throws IOException {
if(element instanceof ServerSentEvent) {
ServerSentEvent event = (ServerSentEvent)element;
((SseEmitter)this.getEmitter()).send(this.adapt(event));
} else {
14. this.getEmitter().send(element, MediaType.APPLICATION_JSON);
}
}
...
}
private abstract static class AbstractEmitterSubscriber implements Subscriber<Object>, Runnable {
private final ResponseBodyEmitter emitter;
private final TaskExecutor taskExecutor;
@Nullable
private Subscription subscription;
private final AtomicReference<Object> elementRef = new AtomicReference();
@Nullable
private Throwable error;
private volatile boolean terminated;
private final AtomicLong executing = new AtomicLong();
private volatile boolean done;
protected AbstractEmitterSubscriber(ResponseBodyEmitter emitter, TaskExecutor executor) {
this.emitter = emitter;
this.taskExecutor = executor;
}
public void connect(ReactiveAdapter adapter, Object returnValue) {
Publisher publisher = adapter.toPublisher(returnValue);
7.2. publisher.subscribe(this);
}
protected ResponseBodyEmitter getEmitter() {
return this.emitter;
}
public final void onSubscribe(Subscription subscription) {
...
ResponseBodyEmitter var10000 = this.emitter;
ResponseBodyEmitter var10001 = this.emitter;
this.emitter.getClass();
var10000.onError(var10001::completeWithError);
8. subscription.request(1L);
}
...
public void run() {
if(this.done) {
this.elementRef.lazySet((Object)null);
} else {
boolean isTerminated = this.terminated;
Object element = this.elementRef.get();
if(element != null) {
this.elementRef.lazySet((Object)null);
Assert.state(this.subscription != null, "No subscription");
try {
13. this.send(element);
21. this.subscription.request(1L);
} catch (Throwable var4) {
if(ReactiveTypeHandler.logger.isDebugEnabled()) {
ReactiveTypeHandler.logger.debug("Send error for " + this.emitter, var4);
}
this.terminate();
return;
}
}
...
}
}
...
AbstractHttpMessageConverter<T>
public abstract class AbstractHttpMessageConverter<T> implements HttpMessageConverter<T> {
...
public final void write(final T t, @Nullable MediaType contentType, HttpOutputMessage outputMessage) throws IOException, HttpMessageNotWritableException {
final HttpHeaders headers = outputMessage.getHeaders();
this.addDefaultHeaders(headers, t, contentType);
17. if(outputMessage instanceof StreamingHttpOutputMessage) {
...
} else {
18. this.writeInternal(t, outputMessage);
outputMessage.getBody().flush();
}
}
...
ServletServerHttpResponse
public class ServletServerHttpResponse implements ServerHttpResponse {
private final HttpServletResponse servletResponse;
private final HttpHeaders headers;
private boolean headersWritten = false;
private boolean bodyUsed = false;
public ServletServerHttpResponse(HttpServletResponse servletResponse) {
Assert.notNull(servletResponse, "HttpServletResponse must not be null");
this.servletResponse = servletResponse;
this.headers = new ServletServerHttpResponse.ServletResponseHttpHeaders();
}
...
public OutputStream getBody() throws IOException {
this.bodyUsed = true;
this.writeHeaders();
20. return this.servletResponse.getOutputStream();
}
public void flush() throws IOException {
this.writeHeaders();
if(this.bodyUsed) {
this.servletResponse.flushBuffer();
}
}
...
- this.reactiveHandler <- new ReactiveTypeHandler(reactiveRegistry, executor, manager)
- this.taskExecutor <- executor (SimpleAsyncTaskExecutor MvcAsync)
- HttpServletResponse response <- webRequest.getNativeResponse(HttpServletResponse.class)
- ServletServerHttpResponse outputMessage <- new ServletServerHttpResponse(response)
- emitter = this.reactiveHandler.handleValue(returnValue, returnType, mavContainer, webRequest)
- SseEmitter result2 <- new SseEmitter(Long.valueOf(-1L))
- (new ReactiveTypeHandler.SseEmitterSubscriber(result2, this.taskExecutor)).connect(adapter, returnValue)
- SseEmitterSubscriber constructor call super AbstractEmitterSubscriber
- to connect to let publisher subscribe SseEmitterSubscriber
- subscription request 1 data when subscribe
- StreamingServletServerHttpResponse outputMessage1 <- new StreamingServletServerHttpResponse(outputMessage)
- outputMessage implements ServerHttpResponse interface so it can pass to StreamingServletServerHttpResponse delegate
- HttpMessageConvertingHandler handler <- new HttpMessageConvertingHandler(outputMessage1, deferredResult);
- emitter.initialize(handler)
- send data
- get emitter to send data
- to call sendInternal when emitter send data
- HttpMessageConverter write data
- outputMessage (StreamingServletServerHttpResponse outputMessage1) is not instanceof StreamingHttpOutputMessage
- each HttpMessageConverter call writeInternal with outputMessage (StreamingServletServerHttpResponse outputMessage1) to call getBody
- delegate call ServletServerHttpResponse outputMessage getBody
- this.servletResponse (HttpServletResponse response) call getOutputStream() return OutputStream
getOutputStream return a ServletOutputStream to provide an output stream for sending binary data to the client
- subscription request 1 data again until complete