javaspring-bootspring-integrationspring-integration-dslspring-integration-http

How to catch exception and throw my own exception inside ExpressionEvaluatingRequestHandlerAdvice and handle it through my own exception handler?


I'm using Spring integration and scatter gather pattern in my project. Here I've applied the Rest Template timeout and also I'm using ExpressionEvaluatingRequestHandlerAdvice() to catch timeout exception. But I want to catch that exception in failure flow and want to throw my own custom exception and I have my own exception handler in place to handle that exception so that I can show proper error message to the user. But here the exception is being thrown but my custom exception handler is not able to handle that exception so user is not getting my custom msg back.

//configuration

 @Bean
  public IntegrationFlow mainFlow() {
    return flow ->
        flow.split()
            .channel(c -> c.executor(Executors.newCachedThreadPool()))
            .scatterGather(
                scatterer ->
                    scatterer
                        .applySequence(true)
                        .recipientFlow(flow1())
                        .recipientFlow(flow2()),
                gatherer ->
                    gatherer
                        .releaseLockBeforeSend(true)
                        .releaseStrategy(group -> group.size() == 1))
            .aggregate()
            .to(anotherFlow());
  }

@Bean
  public IntegrationFlow flow2() {
    return flow -> {
        flow.channel(c -> c.executor(Executors.newCachedThreadPool()))
            .handle(
                Http.outboundGateway(
                        "http://localhost:4444/test", dummyService.restTemplate())
                    .httpMethod(HttpMethod.POST)
                    .expectedResponseType(String.class),
                c -> c.advice(expressionAdvice()));
    };
  }

 @Bean
  public Advice expressionAdvice() {
    ExpressionEvaluatingRequestHandlerAdvice advice =
        new ExpressionEvaluatingRequestHandlerAdvice();
    advice.setSuccessChannelName("success.input");
    advice.setOnSuccessExpressionString("payload + ' was successful'");
    advice.setFailureChannelName("failure.input");
    advice.setOnFailureExpressionString("'Failed ' + #exception.cause.message");
    advice.setReturnFailureExpressionResult(true);
    advice.setTrapException(true);
    return advice;
  }

  @Bean
  public IntegrationFlow success() {
    return f -> f.handle(System.out::println);
  }

  @Bean
  public IntegrationFlow failure() {
    return f ->
        f.handle(
                (p, h) -> {
                  if (p.toString().contains("Read timed out"))
                    throw new MyCustomException(ErrorCode.TIMEOUT_ERROR.getErrorData());
                  else throw new MyCustomException(ErrorCode.SERVICE_ERROR.getErrorData());
                });
  }

//DummyService class

@Configuration
public class DummyService {

  private final int TIMEOUT = (int) TimeUnit.SECONDS.toMillis(6);

  @Bean
  public RestTemplate restTemplate()
       {

    HttpComponentsClientHttpRequestFactory requestFactory =
        new HttpComponentsClientHttpRequestFactory();

    requestFactory.setHttpClient(httpClient);
    requestFactory.setConnectTimeout(TIMEOUT);
    requestFactory.setReadTimeout(TIMEOUT);
    RestTemplate restTemplate = new RestTemplate(requestFactory);
    return restTemplate;
  }
}

Here I'm trying to throw new exception in failure() flow but exception is being thrown properly but my custom exception handler framework is not able to catch that exception. In all other cases it's able to catch but inside the spring integration configuration class it's not working.


Solution

  • When you do an async hand-off like with your flow.channel(c -> c.executor(Executors.newCachedThreadPool())), you cannot re-throw exceptions from that thread: the control is lost. You can have an errorChannel header populated for this flow, so an async ErrorHandler on the task executor is going to publish the ErrorMessage properly to that channel for some logic. From there you can return so-called compensation message for waiting gateway - gatherer in your case. What is very important is to preserve request headers. Otherwise it won't be able to correlated request with reply.

    UPDATE

    Your understanding of async error handling is a bit not correct.

    The ExecutorChannel wraps a provided Executor into an ErrorHandlingTaskExecutor, which has the logic like this:

    public void execute(final Runnable task) {
        this.executor.execute(() -> {
            try {
                task.run();
            }
            catch (Throwable t) { //NOSONAR
                ErrorHandlingTaskExecutor.this.errorHandler.handleError(t);
            }
        });
    }
    

    So, as you see that errorHandler is called inside the task, where re-throwing an exception from there will just end-user in void, according to your ThreadPoolExecutor:

    protected void afterExecute(Runnable r, Throwable t) { }
    

    That's why we talk about a compensation message instead of re-throwing some custom exception. That compensation message, though, could be a new ErrorMessage, but it has to preserve headers from request. something like this:

        @ServiceActivator(inputChannel = "scatterGatherErrorChannel")
        public Message<?> processAsyncScatterError(MessagingException payload) {
            return MessageBuilder.withPayload(payload.getCause())
                    .copyHeaders(payload.getFailedMessage().getHeaders())
                    .build();
        }
    

    This message is going to be returned to the scatter-gather as a reply. Probably you are not interested in this, so you can bypass a reply for scatter-gather and propagate your custom error directly to the origin gateway's error channel. Something like this:

        @ServiceActivator(inputChannel = "scatterGatherErrorChannel")
        public Message<?> processAsyncScatterError(MessagingException payload) {
            MessageHeaders requestHeaders = payload.getFailedMessage().getHeaders();
            return MessageBuilder.withPayload(payload.getCause())
                    .copyHeaders(requestHeaders)
                    .setHeader(MessageHeaders.REPLY_CHANNEL, requestHeaders.get("originalErrorChannel"))
                    .build();
        }
    

    That originalErrorChannel comes really from the request message to the scatter-gather and is waited by the origin gateway.