I'm using Spring Integration using DSL for handling communication between JMS and REST service. The requirement is that messages should be redelivered indefinetly. In one case, I have to sequentially execute two operations. If first one fails, the second one shouldn't be executed, but in case it's 4xx error I shouldn't try to redeliver it. My code looks like this:
IntegrationFlows.from(Jms.messageDrivenChannelAdapter(Jms.container(connectionFactory, destinationn)).get())
.publishSubscribeChannel(c -> c
.subscribe(firstRestOperation ->
firstRestOperation
.transform(originalMessageToFirstRequestTransformer())
.handle(Http.outboundGateway(restApiBaseUri + "/first-endpoint", restTemplate)
.httpMethod(HttpMethod.POST).get()) //when this handler receives HTTP Status 4xx,
//second operation shouldn't be executed and
//and message shouldn't be redelievered
.subscribe(secondRestOperation->
secondRestOperation
.transform(originalMessageToSecondRequestTransformer())
.handle(Http.outboundGateway(restApiBaseUri + "/second-endpoint", restTemplate).httpMethod(HttpMethod.POST).get())
).get();
class MyErrorHandler extends DefaultResponseErrorHandler { //this is used in Option B
@Override
public void handleError(ClientHttpResponse response) throws IOException {
if(response.getStatusCode().is4xxClientError()){
log.warn(...);
}else{
super.handleError(response);
}
}
}
@Bean
public RestTemplate restTemplate() {
RestTemplate restTemplate = new RestTemplate();
restTemplate.setErrorHandler(myErrorHandler); //this is used in Option B
return restTemplate;
}
How can I meet these requirements? Only idea I have is to somehow interrupt IntegrationFlow while commiting JMS session.
Thanks for any suggestions.
UPDATE
Option A: Currently:
Option B: I can also handle 4xx error, then:
but this causes operation 2 to be executed
What I need is:
operation 1 fails with 4xx
operation 2 is not executed
message is not redelivered
UPDATE 2
I think I might be getting somewhere. As @gary-russel suggested I added error channel, and handled 4xx errors:
@Bean
public MessageProducerSupport inputUpsertCustomerMessageProducerSupport() {
return Jms.messageDrivenChannelAdapter(Jms.container(connectionFactory, destination).messageSelector(jmsSelector)).errorChannel(errorHandlingChannel).get();
}
@Bean
public PublishSubscribeChannel errorHandlingChannel() {
return MessageChannels.publishSubscribe().get();
}
@Bean
public ErrorMessageExceptionTypeRouter errorMessageExceptionTypeRouter() {
ErrorMessageExceptionTypeRouter router = new ErrorMessageExceptionTypeRouter();
router.setChannelMapping(HttpClientErrorException.class.getName(), "clientErrorMessageChannel");
router.setDefaultOutputChannel(unhandledErrorsChannel());
return router;
}
@Bean
public IntegrationFlow errorHandlingFlow() {
return IntegrationFlows.from(customErrorChannel())
.log()
.route(errorMessageExceptionTypeRouter())
.get();
}
@Bean
public MessageChannel clientErrorMessageChannel(){
return MessageChannels
.direct()
.get();
}
@Bean
public IntegrationFlow clientErrorFlow() {
return IntegrationFlows.from(clientErrorMessageChannel())
.handle(message -> log.warn(...) //handle error here
.get();
}
@Bean
public MessageChannel unhandledErrorsChannel(){
return MessageChannels.direct().get();
}
@Bean
public IntegrationFlow unhandledErrorsFlow(){
//how should I implement it?
}
I want to handle ONLY 4xx error, the rest should be propagated and should cause JMS message redelivery. I tried not setting defaultOutputChannel
in ErrorMessageExceptionTypeRouter
(than another exception is thrown) or setting defaultOutputChannel
to default errorChannel
(than all of the errors are handled).
UPDATE 3
Found solution in: Spring Integration Java DSL using JMS retry/redlivery
Here's code for my error handling flow:
@Bean
public MessageProducerSupport inputUpsertCustomerMessageProducerSupport() {
return Jms.messageDrivenChannelAdapter(Jms.container(connectionFactory, destination).messageSelector(jmsSelector)).errorChannel(customErrorChannel()).get();
}
@Bean
public PublishSubscribeChannel customErrorChannel() {
return MessageChannels.publishSubscribe().get();
}
@Bean
public ErrorMessageExceptionTypeRouter errorMessageExceptionTypeRouter() {
ErrorMessageExceptionTypeRouter router = new ErrorMessageExceptionTypeRouter();
router.setChannelMapping(HttpClientErrorException.class.getName(), "clientErrorMessageChannel");
router.setDefaultOutputChannel(unhandledErrorsChannel());
return router;
}
@Bean
public MessageChannel clientErrorMessageChannel(){
return MessageChannels
.direct()
.get();
}
@Bean
public MessageChannel unhandledErrorsChannel(){
return MessageChannels.direct().get();
}
@Bean
public IntegrationFlow unhandledErrorsFlow(){
return IntegrationFlows.from(unhandledErrorsChannel()).handle("thisBeanName", "handleError").get();
}
public void handleError(Throwable t) throws Throwable {
log.warn("Received unhandled exception");
throw t;
}
@Bean
public IntegrationFlow clientErrorFlow() {
return IntegrationFlows.from(clientErrorMessageChannel())
.handle(message -> log.warn("Received HTTP Status 4xx with message: " + ((MessageHandlingException)message.getPayload()).getCause().getMessage()))
.get();
}
@Bean
public IntegrationFlow errorHandlingFlow() {
return IntegrationFlows.from(customErrorChannel())
.log()
.route(errorMessageExceptionTypeRouter())
.get();
}
So the solution was to redirect exceptions to a flow that will handle them by rethrowing them. Too bad BaseIntegrationFlow
doesn't have a method that accepts and throws Throwable
- right now it's only possible by specifying bean and method name to invoke.
That is the default behavior; the second subscriber won't be called unless the ignoreFailures
property is true
(it is false
by default).
You need to show the upstream flow, but to "catch" the exception you need to add an error channel to the (presumably) message-driven inbound adapter and handle the exception there.