spring-integrationspring-integration-dsl

Spring Integration - recommended splitter/aggregator setup for async subflow replying in an undefined time in the future


I currently have a flow with a splitter and an aggregator that uses a gateway to call another complex flow with asynchronous subflows.

Currently I have the gateway configured with a replyTimeout set to -1 to make it an infinite timeout value (infinite blocking) after upgrading to Spring Integration 6.x, but the upgrade docs say that this previously default value was changed because:

According to distributed systems design and bad experience of demo developing it is not OK to block forever.

In light of this premise, what is the recommended way for having a splitter/aggregator where the splitter subflow calls another flow that will return sometime in the future rather than having a gateway call the other flow and wait with an infinite timeout?

return from( "inputChannel" )
    .transform( data -> transformData( data ) )
    .splitWith( splitter -> splitter
        .applySequence( true )
    )
    .gateway( "initiateAutoScheduledScenarioFlow.input", gateway -> gateway
        .requiresReply( true )
        .replyTimeout( -1L )
    )
    .aggregate();

Solution

  • So, you use a .gateway() to wait for a reply to send it to the aggregate(). So, this is blocking. Then it looks like the whole flow is used in the main flow to wait for a reply from that aggregator.

    Right now there is no way to make that .gateway() as async, however I think it is possible since our downstream expectations don't require to have it blocked. The aggregator would block it either way until the whole splitted group is gathered.

    As a solution I would suggest to divide your current flow into two with an input channel for the aggregator. Then that initiateAutoScheduledScenarioFlow should use this aggregator input channel as an output in the end of its processing.

    Something like this:

    
    from( "inputChannel" )
        .transform( data -> transformData( data ) )
        .splitWith( splitter -> splitter
            .applySequence( true )
        )
        .channel("initiateAutoScheduledScenarioFlow.input")
    
    
    from( "aggregatorInputChannel" )
       .aggregate()
       .get();
    
    IntegrationFlow initiateAutoScheduledScenarioFlow() {
        return flow -> flow
                    .handle()
                    .channel("aggregatorInputChannel");
    }
    

    Another way is to use a @MessagingGateway interface with a CompletableFuture<> as return of the gateway method. But I don't think it will be easier, since this from( "inputChannel" ) still has to be divided.

    Please, raise a GH issue to introduce an async behavior for the gateway() operator.