spring-integration

Stop execution splitter


I'm looking to implement an integration flow with a split/aggregate. I would like to stop execution of handlers when an Exception is thrown, and continue iteration of all elements split.

IntegrationFlow.from(
        WebFlux.inboundGateway("/jira/version")
                .requestMapping(r -> r.methods(HttpMethod.POST)
                        .consumes("application/json"))
                .requestPayloadType(String.class)
                .replyChannel(replyChannel)
                .errorChannel(errorChannel)
                .mappedRequestHeaders(parameter.getJiraHeaderSignature()))
.split()
.handle(handler1, s -> s.advice(advice()))
.handle(handler2, s -> s.advice(advice()))
.handle(handler3, s -> s.advice(advice()))
.aggregate()
.get();





public Advice advice() {
    var advice = new ExpressionEvaluatingRequestHandlerAdvice();
    advice.setReturnFailureExpressionResult(true);
    advice.setOnFailureExpressionString("payload");
    return advice;
}

Example : collection with 2 elements split:

aggregate :

This behaviour is "ok" when I used TrapException, the handler chain is stopped, but aggregate failed because is never ends (see Trap Exception with splitter)

Is there a solution to do this ?

UPDATE #1: You say right and it's correct. Detour works perfectly. But I don't understand how filter ErrorMessage directly with aggregator. When I do this with a handle after aggregate, it's works, but how implement this directly with aggregator pliz ?

...

.handle(handler1, s -> s.advice(advice()))
.handle(handler2, s -> s.advice(advice()))
.channel("jiraAggregatorInputChannel")
.aggregate()
.<List>handle((p, h) -> p.stream()
                         .filter(i -> i instanceof customObject)
                         .toList())
.get()




public Advice advice() {
    var advice = new ExpressionEvaluatingRequestHandlerAdvice();
    advice.setTrapException(true);
    advice.setFailureChannelName("jiraAggregatorInputChannel");
    return advice;
}

UPDATE #2:

For me it's not the same thing, see this. When I use the filter outside of the aggregator, my flow returns good a json with a list of CustomObject

IntegrationFlow.from(
    WebFlux.inboundGateway("/jira/version")
        .requestMapping(r -> r.methods(HttpMethod.POST)
        .consumes("application/json"))
        .requestPayloadType(String.class)
        .replyChannel(replyChannel)
        .errorChannel(errorChannel))
    .handle(handler1, s -> s.advice(advice()))
    .handle(handler2, s -> s.advice(advice()))
    .channel("jiraAggregatorInputChannel")
    .aggregate()
    .<List>handle((p, h) -> p.stream()
        .filter(i -> i instanceof CustomObject)
        .toList())
.get()

Result, list of CustomObject :

[{ "issuesNumberAkuiteo": ....
   "idVersionJira": ....
}]

But when I use filter inside of the aggregator, the flow returns a List of GenericMessage

IntegrationFlow.from(
        WebFlux.inboundGateway("/jira/version")
            .requestMapping(r -> r.methods(HttpMethod.POST)
            .consumes("application/json"))
            .requestPayloadType(String.class)
            .replyChannel(replyChannel)
            .errorChannel(errorChannel))
        .handle(handler1, s -> s.advice(advice()))
        .handle(handler2, s -> s.advice(advice()))
        .channel("jiraAggregatorInputChannel")
        .aggregate(a -> a.outputProcessor(group -> group
            .getMessages()
            .stream()
            .filter(i -> i.getPayload() instanceof CustomObject
            .toList()))
    .get()

Result :

[{ "payload": { .... }
   "headers": { .... }
}]

extractReplyPayload from WebFlux is true by default, I don't understand why the behaviour is different :(

UPDATE #3:

IntegrationFlow.from(
        WebFlux.inboundGateway("/jira/version")
            .requestMapping(r -> r.methods(HttpMethod.POST)
            .consumes("application/json"))
            .requestPayloadType(String.class)
            .replyChannel(replyChannel)
            .errorChannel(errorChannel))
        .handle(handler1, s -> s.advice(advice()))
        .handle(handler2, s -> s.advice(advice()))
        .channel("jiraAggregatorInputChannel")
        .aggregate(a -> a.outputProcessor(group -> group
            .getMessages()
            .stream()
            .map(Message::getPayload)
            .filter(i -> i instanceof CustomObject)
            .toList()))
    .get()

Returns an exception :

The expected collection of Messages contains non-Message element: class fr.eksae.erp.connector.model.CustomObject: class fr.eksae.erp.connector.model.CustomObject

I can't copy paste all the stack, but last code is :

org.springframework.util.Assert,assignableCheckFailed,Assert.java,610
org.springframework.util.Assert,isAssignable,Assert.java,560
org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler,verifyResultCollectionConsistsOfMessages,AbstractCorrelatingMessageHandler.java,948
org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler,completeGroup,AbstractCorrelatingMessageHandler.java,907

SOLUTION :

IntegrationFlow.from(
    WebFlux.inboundGateway("/jira/version")
        .requestMapping(r -> r.methods(HttpMethod.POST)
        .consumes("application/json"))
        .requestPayloadType(String.class)
        .replyChannel(replyChannel)
        .errorChannel(errorChannel))
    .split()
    .handle(handler1, s -> s.advice(advice()))
    .handle(handler2, s -> s.advice(advice()))
    .channel("jiraAggregatorInputChannel")
    .aggregate(a -> a.outputProcessor(
        group -> new GenericMessage<>(group
            .getMessages()
            .stream()
            .filter(i -> i.getPayload() instanceof CustomObject)
            .map(Message::getPayload)
            .toList())))
.get()

public Advice advice() {
    var advice = new ExpressionEvaluatingRequestHandlerAdvice();
    advice.setTrapException(true);
    advice.setFailureChannelName("jiraAggregatorInputChannel");
    return advice;
}

Solution

  • So, to summarize, you want the failed message to go directly to the aggregator, bypassing the rest of flow in between.

    Yes, that is possible, and pattern called Detour. There is no such a component in Spring Integration because it simply can be implemented with channels manipulation or with a router.

    What I will suggest would not fit into that Gregor Hohpe's explanation, but I still would call it detour, because that's exactly what we are going to do here with you for your task.

    So, first of all all the EIP-methods in the IntegrationFlow represent endpoints. And as we know all the endpoints in Spring Integration interact with each other via MessageChannel. Even if we don't use channel() DSL in our IntergationFlow, there are still DirectChannel beans injected in between. More info in docs: https://docs.spring.io/spring-integration/reference/dsl/java-basics.html.

    At the same time no one stops us from declaring something like .channel("jiraAggregatorInputChannel") before that aggregate(). That would work just because everything from IntegrationFlow definition is going to be registered as a bean into the application context: the MessageChannel with that name, an AggregatingMessageHandler and EventDrivenConsumer to subscribe to this handler to that channel. Therefore, at runtime we simply can send a message to that channel and it is going to be handled properly by aggregator. And that is exactly what happens after your last handle() - it will emit its reply into that channel and so on.

    This explicit channel is a inbound side of our detour.

    Now, look into that ExpressionEvaluatingRequestHandlerAdvice and pay attention to its:

    /**
     * Set the channel to which to send the {@link ErrorMessage} after evaluating the
     * failure expression.
     * @param failureChannel the channel.
     */
    public void setFailureChannel(MessageChannel failureChannel) {
    

    So, when exception is thrown from your handler, it is going to be sent as an ErrorMessage to that failureChannel.

    You would still need to use trapException = true, to not let it to bubble. But at the same time you must not use returnFailureExpressionResult = true. Just because we don't want error to be handled with the rest of logic, but rather go directly to the aggregator. Detour, in other words!

    Now, since all the messages with respective correlation group are sent to the aggregator, you can do some smart logic filtering out those ErrorMessage items in the group or do something else.

    You don't need, though, that setOnFailureExpressionString("payload") because it is a default one when we declare a channel:

        if (this.onFailureExpression == null
                && (this.failureChannel != null || StringUtils.hasText(this.failureChannelName))) {
    
            this.onFailureExpression = DEFAULT_EXPRESSION;
        }
    

    The DEFAULT_EXPRESSION is a function for Message::getPayload.

    UPDATE

    To make aggregated group manipulation, you can add this option to your aggregator:

                    .aggregate(a -> a
                            .outputProcessor((group) -> group
                                    .getMessages()
                                    .stream()
                                    .map(m -> (String) m.getPayload())
                                    .collect(Collectors.joining(" ")))
    

    I show this as an example, but you can do whatever you want with that stream of messages. Technically similar to what you have with that next handle(), but directly in the aggregator API. You really can check in the filter of the stream for an ErrorMessage type, if you are not interested in errors downstream.