javaspring-webfluxproject-reactor

spring webflux does not catch exception


I’m trying to catch a thrown exception in a Spring WebFlux controller. When i[0] reaches 3, it enters the handler method and executes the handler, but the returned value is not shown in the output, and it behaves as though the exception is not thrown anywhere in the app. How can I resolve this problem?

controller:

@GetMapping("all")
public ResponseEntity<Flux<CustomerDTO>> findAll(@RequestParam(defaultValue = "0") Integer page,
                                                 @RequestParam(defaultValue = "10") Integer pageSize) {
    int[] i = {0};
    return ResponseEntity.ok(customerService.findAll(Paginator.of(page, pageSize))
            .flatMap(dto -> {
                if (i[0] == 3)
                    return Mono.error(new RuntimeException("ERROR"));
                i[0]++;
                return Mono.just(dto);
            }));
}

exception handler

@RestControllerAdvice
public class GlobalExceptionHandler {
    @ExceptionHandler(Exception.class)
    public ResponseEntity<Map<String, Object>> handleException(Exception ex) {
        Map<String, Object> response = new HashMap<>();
        response.put("message", ex.getMessage());
        response.put("status", HttpStatus.BAD_REQUEST.value());
        return new ResponseEntity<>(response, HttpStatus.BAD_REQUEST);
    }
}

postman output:

Connected to http://localhost:8080/customer/all
{"id":8,"name":"eisa","email":"aaa@gmail.com"}
{"id":1,"name":"sam","email":"vvv@yahoo.com"}
{"id":9,"name":"yunos","email":"zzz@gmail.com"}
Connection closed

console output:

java.lang.UnsupportedOperationException: null
    at org.springframework.http.ReadOnlyHttpHeaders.set(ReadOnlyHttpHeaders.java:112) ~[spring-web-6.2.12.jar:6.2.12]
    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Assembly trace from producer [reactor.core.publisher.MonoFlatMap] :
    reactor.core.publisher.Mono.flatMap(Mono.java:3179)
    org.springframework.web.reactive.result.method.annotation.ResponseEntityResultHandler.handleResult(ResponseEntityResultHandler.java:151)
Error has been observed at the following site(s):
    *__________Mono.flatMap ⇢ at org.springframework.web.reactive.result.method.annotation.ResponseEntityResultHandler.handleResult(ResponseEntityResultHandler.java:151)
    |_           checkpoint ⇢ Exception handler org.isoft.webflux.exception.GlobalExceptionHandler#handleResponseStatusException(Exception), error="ERRORRRRR" [DispatcherHandler]
    *__________Mono.flatMap ⇢ at org.springframework.web.reactive.DispatcherHandler.lambda$handleResultMono$6(DispatcherHandler.java:177)
    *____Mono.onErrorResume ⇢ at org.springframework.web.reactive.DispatcherHandler.lambda$handleResultMono$7(DispatcherHandler.java:176)
    *__________Mono.flatMap ⇢ at org.springframework.web.reactive.DispatcherHandler.handleResultMono(DispatcherHandler.java:172)
    *__________Mono.flatMap ⇢ at org.springframework.web.reactive.DispatcherHandler.handle(DispatcherHandler.java:154)
    *____________Mono.defer ⇢ at org.springframework.web.server.handler.DefaultWebFilterChain.filter(DefaultWebFilterChain.java:106)
    |_       Mono.doOnError ⇢ at org.springframework.web.server.handler.ExceptionHandlingWebHandler.handle(ExceptionHandlingWebHandler.java:84)
    |_   Mono.onErrorResume ⇢ at org.springframework.web.server.handler.ExceptionHandlingWebHandler.handle(ExceptionHandlingWebHandler.java:85)
    |_       Mono.doOnError ⇢ at org.springframework.web.server.handler.ExceptionHandlingWebHandler.handle(ExceptionHandlingWebHandler.java:84)
    |_   Mono.onErrorResume ⇢ at org.springframework.web.server.handler.ExceptionHandlingWebHandler.handle(ExceptionHandlingWebHandler.java:85)
    |_       Mono.doOnError ⇢ at org.springframework.web.server.handler.ExceptionHandlingWebHandler.handle(ExceptionHandlingWebHandler.java:84)
    |_   Mono.onErrorResume ⇢ at org.springframework.web.server.handler.ExceptionHandlingWebHandler.handle(ExceptionHandlingWebHandler.java:85)
    |_     Mono.doOnSuccess ⇢ at org.springframework.web.server.adapter.HttpWebHandlerAdapter.handle(HttpWebHandlerAdapter.java:299)
    |_   Mono.onErrorResume ⇢ at org.springframework.web.server.adapter.HttpWebHandlerAdapter.handle(HttpWebHandlerAdapter.java:300)
    |_             Mono.tap ⇢ at org.springframework.web.server.adapter.HttpWebHandlerAdapter.handle(HttpWebHandlerAdapter.java:301)
    *____________Mono.error ⇢ at org.springframework.web.server.handler.ExceptionHandlingWebHandler$CheckpointInsertingHandler.handle(ExceptionHandlingWebHandler.java:106)
    |_           checkpoint ⇢ HTTP GET "/customer/all" [ExceptionHandlingWebHandler]
    *____________Mono.error ⇢ at org.springframework.boot.autoconfigure.web.reactive.error.AbstractErrorWebExceptionHandler.handle(AbstractErrorWebExceptionHandler.java:293)
    *____________Mono.error ⇢ at org.springframework.web.server.handler.ResponseStatusExceptionHandler.handle(ResponseStatusExceptionHandler.java:68)
    *____________Mono.error ⇢ at org.springframework.web.server.adapter.HttpWebHandlerAdapter.handleUnresolvedError(HttpWebHandlerAdapter.java:358)
    *_____________Mono.then ⇢ at org.springframework.web.server.adapter.HttpWebHandlerAdapter.handle(HttpWebHandlerAdapter.java:302)
    *_____________Mono.then ⇢ at org.springframework.web.server.adapter.HttpWebHandlerAdapter.handle(HttpWebHandlerAdapter.java:303)
    |_       Mono.doOnError ⇢ at org.springframework.http.server.reactive.ReactorHttpHandlerAdapter.apply(ReactorHttpHandlerAdapter.java:66)
    |_     Mono.doOnSuccess ⇢ at org.springframework.http.server.reactive.ReactorHttpHandlerAdapter.apply(ReactorHttpHandlerAdapter.java:67)
    *__Mono.deferContextual ⇢ at reactor.netty.http.server.HttpServer$HttpServerHandle.onStateChange(HttpServer.java:1317)
Original Stack Trace:
        at org.springframework.http.ReadOnlyHttpHeaders.set(ReadOnlyHttpHeaders.java:112) ~[spring-web-6.2.12.jar:6.2.12]
        at org.springframework.http.HttpHeaders.setContentType(HttpHeaders.java:1032) ~[spring-web-6.2.12.jar:6.2.12]
        at org.springframework.http.codec.ServerSentEventHttpMessageWriter.write(ServerSentEventHttpMessageWriter.java:111) ~[spring-web-6.2.12.jar:6.2.12]
        at org.springframework.http.codec.ServerSentEventHttpMessageWriter.write(ServerSentEventHttpMessageWriter.java:176) ~[spring-web-6.2.12.jar:6.2.12]
        at org.springframework.web.reactive.result.method.annotation.AbstractMessageWriterResultHandler.writeBody(AbstractMessageWriterResultHandler.java:233) ~[spring-webflux-6.2.12.jar:6.2.12]
        at org.springframework.web.reactive.result.method.annotation.ResponseEntityResultHandler.lambda$handleResult$1(ResponseEntityResultHandler.java:205) ~[spring-webflux-6.2.12.jar:6.2.12]
        at reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:153) ~[reactor-core-3.7.12.jar:3.7.12]
        at reactor.core.publisher.MonoFlatMap.subscribeOrReturn(MonoFlatMap.java:53) ~[reactor-core-3.7.12.jar:3.7.12]
        at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:63) ~[reactor-core-3.7.12.jar:3.7.12]
        at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:165) ~[reactor-core-3.7.12.jar:3.7.12]
        at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79) ~[reactor-core-3.7.12.jar:3.7.12]
        at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:158) ~[reactor-core-3.7.12.jar:3.7.12]
        at reactor.core.publisher.MonoZip$ZipCoordinator.signal(MonoZip.java:297) ~[reactor-core-3.7.12.jar:3.7.12]
        at reactor.core.publisher.MonoZip$ZipInner.onNext(MonoZip.java:478) ~[reactor-core-3.7.12.jar:3.7.12]
        at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2570) ~[reactor-core-3.7.12.jar:3.7.12]
        at reactor.core.publisher.MonoZip$ZipInner.onSubscribe(MonoZip.java:470) ~[reactor-core-3.7.12.jar:3.7.12]
        at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:55) ~[reactor-core-3.7.12.jar:3.7.12]
        at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:76) ~[reactor-core-3.7.12.jar:3.7.12]
        at reactor.core.publisher.MonoZip$ZipCoordinator.request(MonoZip.java:220) ~[reactor-core-3.7.12.jar:3.7.12]
        at reactor.core.publisher.MonoFlatMap$FlatMapMain.request(MonoFlatMap.java:194) ~[reactor-core-3.7.12.jar:3.7.12]
        at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:2366) ~[reactor-core-3.7.12.jar:3.7.12]
        at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onSubscribe(FluxOnErrorResume.java:74) ~[reactor-core-3.7.12.jar:3.7.12]
        at reactor.core.publisher.MonoFlatMap$FlatMapMain.onSubscribe(MonoFlatMap.java:117) ~[reactor-core-3.7.12.jar:3.7.12]
        at reactor.core.publisher.MonoZip.subscribe(MonoZip.java:129) ~[reactor-core-3.7.12.jar:3.7.12]
        at reactor.core.publisher.Mono.subscribe(Mono.java:4576) ~[reactor-core-3.7.12.jar:3.7.12]
        at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103) ~[reactor-core-3.7.12.jar:3.7.12]
        at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onError(MonoPeekTerminal.java:258) ~[reactor-core-3.7.12.jar:3.7.12]
        at org.springframework.http.server.reactive.ChannelSendOperator$WriteCompletionBarrier.onError(ChannelSendOperator.java:419) ~[spring-web-6.2.12.jar:6.2.12]
        at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onError(MonoIgnoreElements.java:84) ~[reactor-core-3.7.12.jar:3.7.12]
        at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onError(FluxConcatArray.java:186) ~[reactor-core-3.7.12.jar:3.7.12]
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onError(MonoIgnoreThen.java:280) ~[reactor-core-3.7.12.jar:3.7.12]
        at reactor.netty.channel.MonoSendMany$SendManyInner.run(MonoSendMany.java:352) ~[reactor-netty-core-1.2.11.jar:1.2.11]
        at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173) ~[netty-common-4.1.128.Final.jar:4.1.128.Final]
        at io.netty.util.concurrent.AbstractEventExecutor.safeExecute$$$capture(AbstractEventExecutor.java:166) ~[netty-common-4.1.128.Final.jar:4.1.128.Final]
        at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java) ~[netty-common-4.1.128.Final.jar:4.1.128.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472) ~[netty-common-4.1.128.Final.jar:4.1.128.Final]
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:569) ~[netty-transport-4.1.128.Final.jar:4.1.128.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:998) ~[netty-common-4.1.128.Final.jar:4.1.128.Final]
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.128.Final.jar:4.1.128.Final]
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.128.Final.jar:4.1.128.Final]
        at java.base/java.lang.Thread.run(Thread.java:1583) ~[na:na]

Solution

  • This is expected behavior since you are returning a stream of T objects (a Flux). Which means that if your stream throws an exception it will disconnect, and it will log the exception. You can't do anything about it.

    A ControllerAdvice cannot catch exceptions that are in a Flux stream. Only Before or After the Stream, because once the Headers are sent the response is committed, the ControllerAdvice does not have access to the Flux sink.

    If you insist on that you suddenly want to change what you are emitting you need to use the onErrorResume operator after the flux and return your error object.

    @GetMapping(value = "/stream", produces = MediaType.APPLICATION_NDJSON_VALUE)
    public Flux<ResponseMessage> stream() {
        return service.getFlux()
            .map(data -> new ResponseMessage("data", data))
            .onErrorResume(e -> Flux.just(
                new ResponseMessage("error", Map.of(
                    "message", e.getMessage(),
                    "type", e.getClass().getSimpleName()
                ))
            ));
    }