I'm looking to implement an integration flow with a split/aggregate. I would like to stop execution of handlers when an Exception is thrown, and continue iteration of all elements split.
IntegrationFlow.from(
WebFlux.inboundGateway("/jira/version")
.requestMapping(r -> r.methods(HttpMethod.POST)
.consumes("application/json"))
.requestPayloadType(String.class)
.replyChannel(replyChannel)
.errorChannel(errorChannel)
.mappedRequestHeaders(parameter.getJiraHeaderSignature()))
.split()
.handle(handler1, s -> s.advice(advice()))
.handle(handler2, s -> s.advice(advice()))
.handle(handler3, s -> s.advice(advice()))
.aggregate()
.get();
public Advice advice() {
var advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setReturnFailureExpressionResult(true);
advice.setOnFailureExpressionString("payload");
return advice;
}
Example : collection with 2 elements split:
aggregate :
This behaviour is "ok" when I used TrapException
, the handler chain is stopped, but aggregate failed because is never ends (see Trap Exception with splitter)
Is there a solution to do this ?
UPDATE #1:
You say right and it's correct. Detour
works perfectly.
But I don't understand how filter ErrorMessage
directly with aggregator. When I do this with a handle after aggregate, it's works, but how implement this directly with aggregator pliz ?
...
.handle(handler1, s -> s.advice(advice()))
.handle(handler2, s -> s.advice(advice()))
.channel("jiraAggregatorInputChannel")
.aggregate()
.<List>handle((p, h) -> p.stream()
.filter(i -> i instanceof customObject)
.toList())
.get()
public Advice advice() {
var advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setTrapException(true);
advice.setFailureChannelName("jiraAggregatorInputChannel");
return advice;
}
UPDATE #2:
For me it's not the same thing, see this. When I use the filter outside of the aggregator, my flow returns good a json with a list of CustomObject
IntegrationFlow.from(
WebFlux.inboundGateway("/jira/version")
.requestMapping(r -> r.methods(HttpMethod.POST)
.consumes("application/json"))
.requestPayloadType(String.class)
.replyChannel(replyChannel)
.errorChannel(errorChannel))
.handle(handler1, s -> s.advice(advice()))
.handle(handler2, s -> s.advice(advice()))
.channel("jiraAggregatorInputChannel")
.aggregate()
.<List>handle((p, h) -> p.stream()
.filter(i -> i instanceof CustomObject)
.toList())
.get()
Result, list of CustomObject :
[{ "issuesNumberAkuiteo": ....
"idVersionJira": ....
}]
But when I use filter inside of the aggregator, the flow returns a List of GenericMessage
IntegrationFlow.from(
WebFlux.inboundGateway("/jira/version")
.requestMapping(r -> r.methods(HttpMethod.POST)
.consumes("application/json"))
.requestPayloadType(String.class)
.replyChannel(replyChannel)
.errorChannel(errorChannel))
.handle(handler1, s -> s.advice(advice()))
.handle(handler2, s -> s.advice(advice()))
.channel("jiraAggregatorInputChannel")
.aggregate(a -> a.outputProcessor(group -> group
.getMessages()
.stream()
.filter(i -> i.getPayload() instanceof CustomObject
.toList()))
.get()
Result :
[{ "payload": { .... }
"headers": { .... }
}]
extractReplyPayload
from WebFlux
is true
by default, I don't understand why the behaviour is different :(
UPDATE #3:
IntegrationFlow.from(
WebFlux.inboundGateway("/jira/version")
.requestMapping(r -> r.methods(HttpMethod.POST)
.consumes("application/json"))
.requestPayloadType(String.class)
.replyChannel(replyChannel)
.errorChannel(errorChannel))
.handle(handler1, s -> s.advice(advice()))
.handle(handler2, s -> s.advice(advice()))
.channel("jiraAggregatorInputChannel")
.aggregate(a -> a.outputProcessor(group -> group
.getMessages()
.stream()
.map(Message::getPayload)
.filter(i -> i instanceof CustomObject)
.toList()))
.get()
Returns an exception :
The expected collection of Messages contains non-Message element: class fr.eksae.erp.connector.model.CustomObject: class fr.eksae.erp.connector.model.CustomObject
I can't copy paste all the stack, but last code is :
org.springframework.util.Assert,assignableCheckFailed,Assert.java,610
org.springframework.util.Assert,isAssignable,Assert.java,560
org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler,verifyResultCollectionConsistsOfMessages,AbstractCorrelatingMessageHandler.java,948
org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler,completeGroup,AbstractCorrelatingMessageHandler.java,907
SOLUTION :
IntegrationFlow.from(
WebFlux.inboundGateway("/jira/version")
.requestMapping(r -> r.methods(HttpMethod.POST)
.consumes("application/json"))
.requestPayloadType(String.class)
.replyChannel(replyChannel)
.errorChannel(errorChannel))
.split()
.handle(handler1, s -> s.advice(advice()))
.handle(handler2, s -> s.advice(advice()))
.channel("jiraAggregatorInputChannel")
.aggregate(a -> a.outputProcessor(
group -> new GenericMessage<>(group
.getMessages()
.stream()
.filter(i -> i.getPayload() instanceof CustomObject)
.map(Message::getPayload)
.toList())))
.get()
public Advice advice() {
var advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setTrapException(true);
advice.setFailureChannelName("jiraAggregatorInputChannel");
return advice;
}
So, to summarize, you want the failed message to go directly to the aggregator, bypassing the rest of flow in between.
Yes, that is possible, and pattern called Detour. There is no such a component in Spring Integration because it simply can be implemented with channels manipulation or with a router.
What I will suggest would not fit into that Gregor Hohpe's explanation, but I still would call it detour
, because that's exactly what we are going to do here with you for your task.
So, first of all all the EIP-methods in the IntegrationFlow
represent endpoints. And as we know all the endpoints in Spring Integration interact with each other via MessageChannel
. Even if we don't use channel()
DSL in our IntergationFlow
, there are still DirectChannel
beans injected in between. More info in docs: https://docs.spring.io/spring-integration/reference/dsl/java-basics.html.
At the same time no one stops us from declaring something like .channel("jiraAggregatorInputChannel")
before that aggregate()
. That would work just because everything from IntegrationFlow
definition is going to be registered as a bean into the application context: the MessageChannel
with that name, an AggregatingMessageHandler
and EventDrivenConsumer
to subscribe to this handler to that channel. Therefore, at runtime we simply can send a message to that channel and it is going to be handled properly by aggregator. And that is exactly what happens after your last handle()
- it will emit its reply into that channel and so on.
This explicit channel is a inbound side of our detour.
Now, look into that ExpressionEvaluatingRequestHandlerAdvice
and pay attention to its:
/**
* Set the channel to which to send the {@link ErrorMessage} after evaluating the
* failure expression.
* @param failureChannel the channel.
*/
public void setFailureChannel(MessageChannel failureChannel) {
So, when exception is thrown from your handler, it is going to be sent as an ErrorMessage
to that failureChannel
.
You would still need to use trapException = true
, to not let it to bubble.
But at the same time you must not use returnFailureExpressionResult = true
. Just because we don't want error to be handled with the rest of logic, but rather go directly to the aggregator. Detour, in other words!
Now, since all the messages with respective correlation group are sent to the aggregator, you can do some smart logic filtering out those ErrorMessage
items in the group or do something else.
You don't need, though, that setOnFailureExpressionString("payload")
because it is a default one when we declare a channel:
if (this.onFailureExpression == null
&& (this.failureChannel != null || StringUtils.hasText(this.failureChannelName))) {
this.onFailureExpression = DEFAULT_EXPRESSION;
}
The DEFAULT_EXPRESSION
is a function for Message::getPayload
.
UPDATE
To make aggregated group manipulation, you can add this option to your aggregator:
.aggregate(a -> a
.outputProcessor((group) -> group
.getMessages()
.stream()
.map(m -> (String) m.getPayload())
.collect(Collectors.joining(" ")))
I show this as an example, but you can do whatever you want with that stream of messages. Technically similar to what you have with that next handle()
, but directly in the aggregator API. You really can check in the filter
of the stream for an ErrorMessage
type, if you are not interested in errors downstream.