springspring-bootspring-integrationintegration-patterns

Spring Integration: Multithreaded HTTP requests aggregation reply only for the last request that release the group and others get a timeout


I am trying to aggregate multiple http requests payload and process them together. So, i have created a spring boot app that contain:

The issue is :

Given a sequence size for example 3, when i send 3 requests consecutively, then the group is released and a reply will be send only once for the last request(the releaser of the group), And my first 2 requests reach a Messaging Gateway timeout and get a null response object.

To illustrate:

  1. request 1: gateway timed out and reply object is null
  2. request 2: gateway timed out and reply object is null
  3. request 3: reply object returned successfully

The RestController code is below:

@RestController
@AllArgsConstructor
public class MyController {

    private final MyIntegrationGateway myIntegrationGateway;

    @PostMapping("/myEndpoint1")
    public ResponseEntity<String> myEndpoint1(@RequestBody MyDataObject payload) {

        var result = myIntegrationGateway.sendToFlow(payload, "GROUP_1", 3);

        return new ResponseEntity<>(result,HttpStatus.CREATED);
    }
}

The MessagingGetway code is below:

@MessagingGateway
public interface MyIntegrationGateway {

    @Gateway(requestChannel = "requestQueue", replyChannel = "responseQueue")
    String sendToFlow(MyDataObject payload,
                        @Header(IntegrationMessageHeaderAccessor.CORRELATION_ID) String correlationId,
                        @Header(IntegrationMessageHeaderAccessor.SEQUENCE_SIZE) int groupSize) throws MessagingException;

}

The IntegrationFlow and channels definitions code is below:

    @Bean
    public IntegrationFlow myAggregatorFlow(MyGroupHandler myGroupHandler) {
        return IntegrationFlow
                .from(requestQueue())
                .aggregate(a -> a
                        .releaseStrategy(new SequenceSizeReleaseStrategy())
                        .expireGroupsUponCompletion(true)
                        .sendPartialResultOnExpiry(true))
                .handle(myGroupHandler)
                .channel("responseQueue")
                .get();
    }

    @Bean
    public MessageChannel requestQueue() {
        return MessageChannels.direct().getObject();
    }

    @Bean
    public MessageChannel responseQueue() {
        return MessageChannels.direct().getObject();
    }

The message group handler code is below :


@MessageEndpoint
public class MyGroupHandler implements GenericHandler<List<MyDataObject>> {


    @Override
    public Object handle(List<MyDataObject> payloadGroup, MessageHeaders headers) {

        if (payloadGroup.isEmpty()) {
            throw new IllegalArgumentException("Payload Group should not be empty");
        }
        .....
        
        return results;
    }
}

Note: For responseQueue, I have tried also to use MessageChannels.publishSubscribe().getObject() as type of channel but no impact. The reply still sent only for the last request.

What I expect ?

I expect that all requests replied with the same result object sent after group processing without waiting Messaging Gateway timeout.


Solution

  • So, you call sendToFlow() method 3 times, but expect only one result for the third one. I think that is not how Java works. Plus it looks like those all are HTTP requests. So, that is not how HTTP protocol works: when you send a request, the response is expected.

    You should rethink the logic and consider to utilize a splitter for a single top-level request: https://docs.spring.io/spring-integration/reference/splitter.html. This way you will be able to extract item requests and aggregate in the end.

    For "multi-HTTP-requests/single-response" you would need to come up with some custom solution, where you ignore responses for interim requests, and deal only with terminal one. The gateway contract might be revised as well, because that is not how Java works.