springspring-integrationspring-integration-dslspring-integration-http

Error handling in PublishSubscribeChannel


I'm using Spring Integration using DSL for handling communication between JMS and REST service. The requirement is that messages should be redelivered indefinetly. In one case, I have to sequentially execute two operations. If first one fails, the second one shouldn't be executed, but in case it's 4xx error I shouldn't try to redeliver it. My code looks like this:

IntegrationFlows.from(Jms.messageDrivenChannelAdapter(Jms.container(connectionFactory, destinationn)).get())
   .publishSubscribeChannel(c -> c
      .subscribe(firstRestOperation ->                                                  
         firstRestOperation
         .transform(originalMessageToFirstRequestTransformer())
         .handle(Http.outboundGateway(restApiBaseUri + "/first-endpoint", restTemplate)
                     .httpMethod(HttpMethod.POST).get())  //when this handler receives HTTP Status 4xx, 
                                                          //second operation shouldn't be executed and 
                                                          //and message shouldn't be redelievered
      .subscribe(secondRestOperation->
         secondRestOperation
         .transform(originalMessageToSecondRequestTransformer())
         .handle(Http.outboundGateway(restApiBaseUri + "/second-endpoint", restTemplate).httpMethod(HttpMethod.POST).get())
).get();

class MyErrorHandler extends DefaultResponseErrorHandler { //this is used in Option B


    @Override
    public void handleError(ClientHttpResponse response) throws IOException {
        if(response.getStatusCode().is4xxClientError()){
            log.warn(...);
        }else{
            super.handleError(response);
        }
    }
}

@Bean
public RestTemplate restTemplate() {
   RestTemplate restTemplate = new RestTemplate();
   restTemplate.setErrorHandler(myErrorHandler); //this is used in Option B
   return restTemplate;
}

How can I meet these requirements? Only idea I have is to somehow interrupt IntegrationFlow while commiting JMS session.

Thanks for any suggestions.

UPDATE

Option A: Currently:

Option B: I can also handle 4xx error, then:

but this causes operation 2 to be executed

What I need is:

UPDATE 2

I think I might be getting somewhere. As @gary-russel suggested I added error channel, and handled 4xx errors:

    @Bean
    public MessageProducerSupport inputUpsertCustomerMessageProducerSupport() {
        return Jms.messageDrivenChannelAdapter(Jms.container(connectionFactory, destination).messageSelector(jmsSelector)).errorChannel(errorHandlingChannel).get();
    }


    @Bean
    public PublishSubscribeChannel errorHandlingChannel() {
        return MessageChannels.publishSubscribe().get();
    }
    
    @Bean
    public ErrorMessageExceptionTypeRouter errorMessageExceptionTypeRouter() {
        ErrorMessageExceptionTypeRouter router = new ErrorMessageExceptionTypeRouter();
        router.setChannelMapping(HttpClientErrorException.class.getName(), "clientErrorMessageChannel");
        router.setDefaultOutputChannel(unhandledErrorsChannel());
        return router;
    }


    @Bean
    public IntegrationFlow errorHandlingFlow() {
        return IntegrationFlows.from(customErrorChannel())
                .log()
                .route(errorMessageExceptionTypeRouter())
                .get();
    }
    

    @Bean
    public MessageChannel clientErrorMessageChannel(){
        return MessageChannels
                .direct()
                .get();
    }


    @Bean
    public IntegrationFlow clientErrorFlow() {
        return IntegrationFlows.from(clientErrorMessageChannel())
                .handle(message -> log.warn(...)    //handle error here
                .get();
    }


        @Bean
    public MessageChannel unhandledErrorsChannel(){
        return MessageChannels.direct().get();
    }
    
        @Bean
    public IntegrationFlow unhandledErrorsFlow(){
        //how should I implement it?
    }

I want to handle ONLY 4xx error, the rest should be propagated and should cause JMS message redelivery. I tried not setting defaultOutputChannel in ErrorMessageExceptionTypeRouter (than another exception is thrown) or setting defaultOutputChannel to default errorChannel (than all of the errors are handled).

UPDATE 3

Found solution in: Spring Integration Java DSL using JMS retry/redlivery

Here's code for my error handling flow:

 @Bean
    public MessageProducerSupport inputUpsertCustomerMessageProducerSupport() {
        return Jms.messageDrivenChannelAdapter(Jms.container(connectionFactory, destination).messageSelector(jmsSelector)).errorChannel(customErrorChannel()).get();
    }

    @Bean
    public PublishSubscribeChannel customErrorChannel() {
        return MessageChannels.publishSubscribe().get();
    }

    @Bean
    public ErrorMessageExceptionTypeRouter errorMessageExceptionTypeRouter() {
        ErrorMessageExceptionTypeRouter router = new ErrorMessageExceptionTypeRouter();
        router.setChannelMapping(HttpClientErrorException.class.getName(), "clientErrorMessageChannel");
        router.setDefaultOutputChannel(unhandledErrorsChannel());
        return router;
    }

    @Bean
    public MessageChannel clientErrorMessageChannel(){
        return MessageChannels
                .direct()
                .get();
    }

    @Bean
    public MessageChannel unhandledErrorsChannel(){
        return MessageChannels.direct().get();
    }

    @Bean
    public IntegrationFlow unhandledErrorsFlow(){
        return IntegrationFlows.from(unhandledErrorsChannel()).handle("thisBeanName", "handleError").get();
    }

    public void handleError(Throwable t) throws Throwable {
        log.warn("Received unhandled exception");
        throw t;
    }

    @Bean
    public IntegrationFlow clientErrorFlow() {
        return IntegrationFlows.from(clientErrorMessageChannel())
                .handle(message -> log.warn("Received HTTP Status 4xx with message: " + ((MessageHandlingException)message.getPayload()).getCause().getMessage()))
                .get();
    }

    @Bean
    public IntegrationFlow errorHandlingFlow() {
        return IntegrationFlows.from(customErrorChannel())
                .log()
                .route(errorMessageExceptionTypeRouter())
                .get();
    }

So the solution was to redirect exceptions to a flow that will handle them by rethrowing them. Too bad BaseIntegrationFlow doesn't have a method that accepts and throws Throwable - right now it's only possible by specifying bean and method name to invoke.


Solution

  • That is the default behavior; the second subscriber won't be called unless the ignoreFailures property is true (it is false by default).

    You need to show the upstream flow, but to "catch" the exception you need to add an error channel to the (presumably) message-driven inbound adapter and handle the exception there.