LoginSignup
1
1

More than 5 years have passed since last update.

[Spring Framework] Servlet streaming back pressure against blocking

Posted at

infinite stream

accept header

  • text/event-stream
  • application/stream+json

back pressure

  • request(n)
  • write, flush
  • repeat

Server Side
webflux-overview.png

Each runtime is adapted to a reactive ServerHttpRequest and ServerHttpResponse exposing the body of the request and response as Flux, rather than InputStream and OutputStream, with reactive backpressure.

Annotation-based Programming Model

The same @Controller programming model and the same annotations used in Spring MVC are also supported in WebFlux. The main difference is that the underlying core, framework contracts — i.e. HandlerMapping, HandlerAdapter, are non-blocking and operate on the reactive ServerHttpRequest and ServerHttpResponse rather than on the HttpServletRequest and HttpServletResponse.

Supported method return types

Reactive types from Reactor 3, RxJava 2, RxJava 1 or others registered through the configured ReactiveAdapterRegistry can be returned as an alternative equivalent to using DeferredResult for single-valued types, or ResponseBodyEmitter and SseEmitter for multi-valued reactive types where a streaming media type (e.g. "text/event-stream", "application/json+stream") is requested.

sample code

demo-reactive-spring

@RestController
public class CarLocationController {

    private final CarRepository repository;


    public CarLocationController(CarRepository repository) {
        this.repository = repository;
    }

    @GetMapping("/cars")
    public Flux<Car> getCars() {
        return this.repository.findAll().log();
    }

    ...

}
$ curl -v -H "Accept:text/event-stream" http://localhost:8081/cars
*   Trying ::1...
* TCP_NODELAY set
* Connected to localhost (::1) port 8081 (#0)
> GET /cars HTTP/1.1
> Host: localhost:8081
> User-Agent: curl/7.51.0
> Accept:text/event-stream
> 
< HTTP/1.1 200 
< Content-Type: text/event-stream;charset=UTF-8
< Transfer-Encoding: chunked
< Date: Tue, 12 Sep 2017 15:52:22 GMT
< 
data:{"id":1,"location":{"longitude":40.740984,"latitude":-73.987904}}

data:{"id":2,"location":{"longitude":40.740940,"latitude":-73.988088}}

data:{"id":3,"location":{"longitude":40.740856,"latitude":-73.988072}}

...

data:{"id":98,"location":{"longitude":40.740876,"latitude":-73.987924}}

data:{"id":99,"location":{"longitude":40.740856,"latitude":-73.988056}}

data:{"id":100,"location":{"longitude":40.740852,"latitude":-73.988060}}

* Curl_http_done: called premature == 0
* Connection #0 to host localhost left intact
[restartedMain] 52:45 TomcatWebServer: Tomcat started on port(s): 8081 (http)

...

[http-nio-8081-exec-1] 52:55 DispatcherServlet: Published WebApplicationContext of servlet 'dispatcherServlet' as ServletContext attribute with name [org.springframework.web.servlet.FrameworkServlet.CONTEXT.dispatcherServlet]
[http-nio-8081-exec-1] 52:55 DispatcherServlet: FrameworkServlet 'dispatcherServlet': initialization completed in 27 ms
[http-nio-8081-exec-1] 52:55 DispatcherServlet: Servlet 'dispatcherServlet' configured successfully
[http-nio-8081-exec-1] 52:55 DispatcherServlet: DispatcherServlet with name 'dispatcherServlet' processing GET request for [/cars]
[http-nio-8081-exec-1] 52:55 RequestMappingHandlerMapping: Looking up handler method for path /cars
[http-nio-8081-exec-1] 52:55 RequestMappingHandlerMapping: Returning handler method [public reactor.core.publisher.Flux<car.Car> car.location.CarLocationController.getCars()]
[http-nio-8081-exec-1] 52:55 DispatcherServlet: Last-Modified value for [/cars] is: -1
[http-nio-8081-exec-1] 52:55 1: | onSubscribe([Fuseable] FluxOnAssembly.OnAssemblySubscriber)
[http-nio-8081-exec-1] 52:55 ReactiveTypeHandler: Subscribed to Publisher for SseEmitter@73873328
[http-nio-8081-exec-1] 52:55 1: | request(1)
[Thread-13] 52:55 1: | onNext(Car{id=1, location={40.740984, -73.987904}})
[MvcAsync1] 52:55 1: | request(1)
[MvcAsync1] 52:55 1: | onNext(Car{id=2, location={40.740940, -73.988088}})
[MvcAsync2] 52:56 1: | request(1)
[Thread-14] 52:56 1: | onNext(Car{id=3, location={40.740856, -73.988072}})

...

[MvcAsync97] 52:56 1: | request(1)
[MvcAsync97] 52:56 1: | onNext(Car{id=98, location={40.740876, -73.987924}})
[MvcAsync98] 52:56 1: | request(1)
[Thread-14] 52:56 1: | onNext(Car{id=99, location={40.740856, -73.988056}})
[MvcAsync99] 52:56 1: | request(1)
[MvcAsync99] 52:56 1: | onNext(Car{id=100, location={40.740852, -73.988060}})
[MvcAsync100] 52:56 1: | request(1)
[Thread-14] 52:56 1: | onComplete()
[MvcAsync101] 52:56 ReactiveTypeHandler: Publishing completed for SseEmitter@73873328
[MvcAsync101] 52:56 WebAsyncManager: Concurrent result value [null] - dispatching request to resume processing
[http-nio-8081-exec-2] 52:56 DispatcherServlet: DispatcherServlet with name 'dispatcherServlet' resumed processing GET request for [/cars]
[http-nio-8081-exec-2] 52:56 RequestMappingHandlerMapping: Looking up handler method for path /cars
[http-nio-8081-exec-2] 52:56 RequestMappingHandlerMapping: Returning handler method [public reactor.core.publisher.Flux<car.Car> car.location.CarLocationController.getCars()]
[http-nio-8081-exec-2] 52:56 DispatcherServlet: Last-Modified value for [/cars] is: -1
[http-nio-8081-exec-2] 52:56 RequestMappingHandlerAdapter: Found concurrent result value [null]
[http-nio-8081-exec-2] 52:56 DispatcherServlet: Null ModelAndView returned to DispatcherServlet with name 'dispatcherServlet': assuming HandlerAdapter completed request handling
[http-nio-8081-exec-2] 52:56 DispatcherServlet: Successfully completed request

blocking OutputStream

ResponseBodyEmitterReturnValueHandler

public class ResponseBodyEmitterReturnValueHandler implements HandlerMethodReturnValueHandler {
    private static final Log logger = LogFactory.getLog(ResponseBodyEmitterReturnValueHandler.class);
    private final List<HttpMessageConverter<?>> messageConverters;
    private final ReactiveTypeHandler reactiveHandler;

    public ResponseBodyEmitterReturnValueHandler(List<HttpMessageConverter<?>> messageConverters) {
        Assert.notEmpty(messageConverters, "HttpMessageConverter List must not be empty");
        this.messageConverters = messageConverters;
        this.reactiveHandler = new ReactiveTypeHandler();
    }

    public ResponseBodyEmitterReturnValueHandler(List<HttpMessageConverter<?>> messageConverters, ReactiveAdapterRegistry reactiveRegistry, TaskExecutor executor, ContentNegotiationManager manager) {
        Assert.notEmpty(messageConverters, "HttpMessageConverter List must not be empty");
        this.messageConverters = messageConverters;
1.      this.reactiveHandler = new ReactiveTypeHandler(reactiveRegistry, executor, manager);
    }

    ...

    public void handleReturnValue(@Nullable Object returnValue, MethodParameter returnType, ModelAndViewContainer mavContainer, NativeWebRequest webRequest) throws Exception {
        if(returnValue == null) {
            mavContainer.setRequestHandled(true);
        } else {
3.          HttpServletResponse response = (HttpServletResponse)webRequest.getNativeResponse(HttpServletResponse.class);
            Assert.state(response != null, "No HttpServletResponse");
4.          ServletServerHttpResponse outputMessage = new ServletServerHttpResponse(response);
            if(returnValue instanceof ResponseEntity) {

            ...

            ResponseBodyEmitter emitter;
            if(returnValue instanceof ResponseBodyEmitter) {
                emitter = (ResponseBodyEmitter)returnValue;
            } else {
5.              emitter = this.reactiveHandler.handleValue(returnValue, returnType, mavContainer, webRequest);
            }

            if(emitter != null) {
                emitter.extendResponse(outputMessage);
                outputMessage.getBody();
                outputMessage.flush();
9.              ResponseBodyEmitterReturnValueHandler.StreamingServletServerHttpResponse outputMessage1 = new ResponseBodyEmitterReturnValueHandler.StreamingServletServerHttpResponse(outputMessage);
                DeferredResult deferredResult = new DeferredResult(emitter.getTimeout());
                WebAsyncUtils.getAsyncManager(webRequest).startDeferredResultProcessing(deferredResult, new Object[]{mavContainer});
11.             ResponseBodyEmitterReturnValueHandler.HttpMessageConvertingHandler handler = new ResponseBodyEmitterReturnValueHandler.HttpMessageConvertingHandler(outputMessage1, deferredResult);
12.             emitter.initialize(handler);
            }
        }
    }

    ...

    private static class StreamingServletServerHttpResponse implements ServerHttpResponse {
        private final ServerHttpResponse delegate;
        private final HttpHeaders mutableHeaders = new HttpHeaders();

        public StreamingServletServerHttpResponse(ServerHttpResponse delegate) {
10.         this.delegate = delegate;
            this.mutableHeaders.putAll(delegate.getHeaders());
        }

        public void setStatusCode(HttpStatus status) {
            this.delegate.setStatusCode(status);
        }

        public HttpHeaders getHeaders() {
            return this.mutableHeaders;
        }

        public OutputStream getBody() throws IOException {
19.         return this.delegate.getBody();
        }

        public void flush() throws IOException {
            this.delegate.flush();
        }

        public void close() {
            this.delegate.close();
        }
    }

    ...

    private class HttpMessageConvertingHandler implements Handler {
        private final ServerHttpResponse outputMessage;
        private final DeferredResult<?> deferredResult;

        public HttpMessageConvertingHandler(ServerHttpResponse var1, DeferredResult<?> outputMessage) {
            this.outputMessage = outputMessage;
            this.deferredResult = deferredResult;
        }

        public void send(Object data, @Nullable MediaType mediaType) throws IOException {
15.          this.sendInternal(data, mediaType);
        }

        private <T> void sendInternal(T data, @Nullable MediaType mediaType) throws IOException {
            if(ResponseBodyEmitterReturnValueHandler.logger.isTraceEnabled()) {
                ResponseBodyEmitterReturnValueHandler.logger.trace("Writing [" + data + "]");
            }

            Iterator var3 = ResponseBodyEmitterReturnValueHandler.this.messageConverters.iterator();

            HttpMessageConverter converter;
            do {
                if(!var3.hasNext()) {
                    throw new IllegalArgumentException("No suitable converter for " + data.getClass());
                }

                converter = (HttpMessageConverter)var3.next();
            } while(!converter.canWrite(data.getClass(), mediaType));

16.         converter.write(data, mediaType, this.outputMessage);
            this.outputMessage.flush();
        }

        ...

ReactiveTypeHandler

class ReactiveTypeHandler {
    private static final long STREAMING_TIMEOUT_VALUE = -1L;
    private static Log logger = LogFactory.getLog(ReactiveTypeHandler.class);
    private final ReactiveAdapterRegistry reactiveRegistry;
    private final TaskExecutor taskExecutor;
    private final ContentNegotiationManager contentNegotiationManager;

    public ReactiveTypeHandler() {
        this(new ReactiveAdapterRegistry(), new SyncTaskExecutor(), new ContentNegotiationManager());
    }

    ReactiveTypeHandler(ReactiveAdapterRegistry registry, TaskExecutor executor, ContentNegotiationManager manager) {
        Assert.notNull(registry, "ReactiveAdapterRegistry is required");
        Assert.notNull(executor, "TaskExecutor is required");
        Assert.notNull(manager, "ContentNegotiationManager is required");
        this.reactiveRegistry = registry;
2.      this.taskExecutor = executor;
        this.contentNegotiationManager = manager;
    }

    public boolean isReactiveType(Class<?> type) {
        return this.reactiveRegistry.hasAdapters() && this.reactiveRegistry.getAdapter(type) != null;
    }

    @Nullable
    public ResponseBodyEmitter handleValue(Object returnValue, MethodParameter returnType, ModelAndViewContainer mav, NativeWebRequest request) throws Exception {

        ...

        if(adapter.isMultiValue()) {
            Stream var10000 = mediaTypes.stream();
            MediaType var10001 = MediaType.TEXT_EVENT_STREAM;
            MediaType.TEXT_EVENT_STREAM.getClass();
            if(var10000.anyMatch(var10001::includes) || ServerSentEvent.class.isAssignableFrom(elementClass)) {
6.              SseEmitter result2 = new SseEmitter(Long.valueOf(-1L));
7.              (new ReactiveTypeHandler.SseEmitterSubscriber(result2, this.taskExecutor)).connect(adapter, returnValue);
                return result2;
            }

            ...

        }

    ...

    private static class SseEmitterSubscriber extends ReactiveTypeHandler.AbstractEmitterSubscriber {
        SseEmitterSubscriber(SseEmitter sseEmitter, TaskExecutor executor) {
7.1.        super(sseEmitter, executor);
        }

        protected void send(Object element) throws IOException {
            if(element instanceof ServerSentEvent) {
                ServerSentEvent event = (ServerSentEvent)element;
                ((SseEmitter)this.getEmitter()).send(this.adapt(event));
            } else {
14.             this.getEmitter().send(element, MediaType.APPLICATION_JSON);
            }

        }

        ...

    }

    private abstract static class AbstractEmitterSubscriber implements Subscriber<Object>, Runnable {
        private final ResponseBodyEmitter emitter;
        private final TaskExecutor taskExecutor;
        @Nullable
        private Subscription subscription;
        private final AtomicReference<Object> elementRef = new AtomicReference();
        @Nullable
        private Throwable error;
        private volatile boolean terminated;
        private final AtomicLong executing = new AtomicLong();
        private volatile boolean done;

        protected AbstractEmitterSubscriber(ResponseBodyEmitter emitter, TaskExecutor executor) {
            this.emitter = emitter;
            this.taskExecutor = executor;
        }

        public void connect(ReactiveAdapter adapter, Object returnValue) {
            Publisher publisher = adapter.toPublisher(returnValue);
7.2.        publisher.subscribe(this);
        }

        protected ResponseBodyEmitter getEmitter() {
            return this.emitter;
        }

        public final void onSubscribe(Subscription subscription) {

            ...

            ResponseBodyEmitter var10000 = this.emitter;
            ResponseBodyEmitter var10001 = this.emitter;
            this.emitter.getClass();
            var10000.onError(var10001::completeWithError);
8.          subscription.request(1L);
        }

        ...

        public void run() {
            if(this.done) {
                this.elementRef.lazySet((Object)null);
            } else {
                boolean isTerminated = this.terminated;
                Object element = this.elementRef.get();
                if(element != null) {
                    this.elementRef.lazySet((Object)null);
                    Assert.state(this.subscription != null, "No subscription");

                    try {
13.                     this.send(element);
21.                     this.subscription.request(1L);
                    } catch (Throwable var4) {
                        if(ReactiveTypeHandler.logger.isDebugEnabled()) {
                            ReactiveTypeHandler.logger.debug("Send error for " + this.emitter, var4);
                        }

                        this.terminate();
                        return;
                    }
                }

                ...

            }
        }

        ...

AbstractHttpMessageConverter<T>

public abstract class AbstractHttpMessageConverter<T> implements HttpMessageConverter<T> {

    ...

    public final void write(final T t, @Nullable MediaType contentType, HttpOutputMessage outputMessage) throws IOException, HttpMessageNotWritableException {
        final HttpHeaders headers = outputMessage.getHeaders();
        this.addDefaultHeaders(headers, t, contentType);
17.     if(outputMessage instanceof StreamingHttpOutputMessage) {
            ...
        } else {
18.         this.writeInternal(t, outputMessage);
            outputMessage.getBody().flush();
        }

    }

    ...

ServletServerHttpResponse

public class ServletServerHttpResponse implements ServerHttpResponse {
    private final HttpServletResponse servletResponse;
    private final HttpHeaders headers;
    private boolean headersWritten = false;
    private boolean bodyUsed = false;

    public ServletServerHttpResponse(HttpServletResponse servletResponse) {
        Assert.notNull(servletResponse, "HttpServletResponse must not be null");
        this.servletResponse = servletResponse;
        this.headers = new ServletServerHttpResponse.ServletResponseHttpHeaders();
    }

    ...

    public OutputStream getBody() throws IOException {
        this.bodyUsed = true;
        this.writeHeaders();
20.     return this.servletResponse.getOutputStream();
    }

    public void flush() throws IOException {
        this.writeHeaders();
        if(this.bodyUsed) {
            this.servletResponse.flushBuffer();
        }

    }

    ...

  1. this.reactiveHandler <- new ReactiveTypeHandler(reactiveRegistry, executor, manager)
  2. this.taskExecutor <- executor (SimpleAsyncTaskExecutor MvcAsync)
  3. HttpServletResponse response <- webRequest.getNativeResponse(HttpServletResponse.class)
  4. ServletServerHttpResponse outputMessage <- new ServletServerHttpResponse(response)
  5. emitter = this.reactiveHandler.handleValue(returnValue, returnType, mavContainer, webRequest)
  6. SseEmitter result2 <- new SseEmitter(Long.valueOf(-1L))
  7. (new ReactiveTypeHandler.SseEmitterSubscriber(result2, this.taskExecutor)).connect(adapter, returnValue)
    1. SseEmitterSubscriber constructor call super AbstractEmitterSubscriber
    2. to connect to let publisher subscribe SseEmitterSubscriber
  8. subscription request 1 data when subscribe
  9. StreamingServletServerHttpResponse outputMessage1 <- new StreamingServletServerHttpResponse(outputMessage)
  10. outputMessage implements ServerHttpResponse interface so it can pass to StreamingServletServerHttpResponse delegate
  11. HttpMessageConvertingHandler handler <- new HttpMessageConvertingHandler(outputMessage1, deferredResult);
  12. emitter.initialize(handler)
  13. send data
  14. get emitter to send data
  15. to call sendInternal when emitter send data
  16. HttpMessageConverter write data
  17. outputMessage (StreamingServletServerHttpResponse outputMessage1) is not instanceof StreamingHttpOutputMessage
  18. each HttpMessageConverter call writeInternal with outputMessage (StreamingServletServerHttpResponse outputMessage1) to call getBody
  19. delegate call ServletServerHttpResponse outputMessage getBody
  20. this.servletResponse (HttpServletResponse response) call getOutputStream() return OutputStream

    getOutputStream return a ServletOutputStream to provide an output stream for sending binary data to the client

  21. subscription request 1 data again until complete

ref.

Servlet vs Reactive Stacks in Five Use Cases

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1