javaspring-integrationdslaggregator

Spring Integration Java DSL -- Configuration of aggregator


I have a very simple integration flow, where a RESTful request is forwarded to two providers using a publish-subscribe channel. The result from both RESTful services is then aggregated in a single array. The sketch of the integration flow is as shown below:

@Bean
IntegrationFlow flow() throws Exception {
    return IntegrationFlows.from("inputChannel")
            .publishSubscribeChannel(s -> s.applySequence(true)
                .subscribe(f -> f
                        .handle(Http.outboundGateway("http://provider1.com/...")
                                .httpMethod(HttpMethod.GET)
                                .expectedResponseType(ItemDTO[].class))
                ).subscribe(f -> f
                        .handle(Http.outboundGateway("http://provider2.com/...")
                                .httpMethod(HttpMethod.GET)
                                .expectedResponseType(ItemDTO[].class)
                        )
                )
            )
            .aggregate()
            .get();
}

However, when running my code, the resulting array contains the items returned by only one of the RESTful services. Is there any configuration step I am missing?

UPDATE

The following version corresponds to the full solution, taking into account Artem's comments.

@Bean
IntegrationFlow flow() throws Exception {
    return IntegrationFlows.from("inputChannel-scatter")
            .publishSubscribeChannel(s -> s.applySequence(true)
                .subscribe(f -> f
                        .handle(Http.outboundGateway("http://provider1.com/...")
                                .httpMethod(HttpMethod.GET)
                                .expectedResponseType(ItemDTO[].class))
                        .channel("inputChannel-gather"))
                .subscribe(f -> f
                        .handle(Http.outboundGateway("http://provider2.com/...")
                                .httpMethod(HttpMethod.GET)
                                .expectedResponseType(ItemDTO[].class))
                        .channel("inputChannel-gather")))
            .get();
}

@Bean
IntegrationFlow gatherFlow() {
    return IntegrationFlows.from("inputChannel-gather")
            .aggregate(a -> a.outputProcessor(g ->  new GenericMessage<ItemDTO[]>(
                        g.getMessages().stream()
                                .flatMap(m -> Arrays.stream((ItemDTO[]) m.getPayload()))
                                .collect(Collectors.toList()).toArray(new ItemDTO[0]))))
            .get();
}

Solution

  • Actually it doesn't work that way.

    The .aggregate() is a third subscriber to that publishSubscribeChannel.

    You have to sever your flow to two of them. Like this:

        @Bean
        public IntegrationFlow publishSubscribeFlow() {
            return flow -> flow
                    .publishSubscribeChannel(s -> s
                            .applySequence(true)
                            .subscribe(f -> f
                                    .handle((p, h) -> "Hello")
                                    .channel("publishSubscribeAggregateFlow.input"))
                            .subscribe(f -> f
                                    .handle((p, h) -> "World!")
                                    .channel("publishSubscribeAggregateFlow.input"))
                    );
        }
    
        @Bean
        public IntegrationFlow publishSubscribeAggregateFlow() {
            return flow -> flow
                    .aggregate(a -> a.outputProcessor(g -> g.getMessages()
                            .stream()
                            .<String>map(m -> (String) m.getPayload())
                            .collect(Collectors.joining(" "))))
                    .channel(c -> c.queue("subscriberAggregateResult"));
        }
    

    Pay attention, please, to the .channel("publishSubscribeAggregateFlow.input") usage from both subscribers.

    To be honest that is a point of any publish-subscribe. We must know where to send the result of all subscribers if we are going to aggregate them.

    Your use-case recalls me the Scatter-Gather EIP pattern.

    We don't have its implementation in the DSL yet. Feel free to raise a GH issue on the matter and we will try to handle it in the upcoming 1.2 version.

    UPDATE

    The GH issue on the matter: https://github.com/spring-projects/spring-integration-java-dsl/issues/75