I'm looking to implement an integration flow with a split/aggregate. I would like to trap Exception throws by handlers, and iterate all elements splited one by one even if some element failed. When all elements throws exception, and exceptions are trapped, the flow never ends. What did I miss plz ? How can I reach the aggregate and the end of this flow ?
IntegrationFlow.from(
WebFlux.inboundGateway("/jira/version")
.requestMapping(r -> r.methods(HttpMethod.POST)
.consumes("application/json"))
.requestPayloadType(String.class)
.replyChannel(replyChannel)
.errorChannel(errorChannel)
.mappedRequestHeaders(parameter.getJiraHeaderSignature()))
.handle(versionWebhookHandler)
.split(new VersionIssueSplitter())
.handle(updateVersionHandler, s -> s.advice(advice()))
.aggregate()
.get();
public Advice advice() {
var advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setTrapException(true);
return advice;
}
The advice.setTrapException(true);
is the correct way to not let that exception to bubble and allow splitter to iterate naturally.
Your problem is with the .aggregate()
. While it is very convenient that splitter and aggregator works in tandem by default, you have to ensure that really all elements reach that aggregator to complete the group of messages properly.
When you trap (or just simply to say catch) exceptions on items handling, there is no reply to send from that handle
down to aggregate
. So, an aggregator cannot fulfill group completion requirements, therefore your flow never ends.
We don't know what is your expectations, but apparently you have to emit anything either way even if item handling ends up with an error. You might come up with some custom object to return from the failureExpression
and returnFailureExpressionResult = true
instead of trapException
. The handler then will take care about wrapping this result into a message with proper correlation details headers from the request.
Then you may look into the:
/**
* A processor to determine the output message from the released group. Defaults to a message
* with a payload that is a collection of payloads from the input messages.
* @param outputProcessor the processor.
* @return the aggregator spec.
*/
public AggregatorSpec outputProcessor(MessageGroupProcessor outputProcessor) {
To iterate over grouped messages and filter out those with errors if you don't need them downstream.
See more info about correlation details in docs: https://docs.spring.io/spring-integration/reference/aggregator.html