I have a very simple integration flow, where a RESTful request is forwarded to two providers using a publish-subscribe channel. The result from both RESTful services is then aggregated in a single array. The sketch of the integration flow is as shown below:
@Bean
IntegrationFlow flow() throws Exception {
return IntegrationFlows.from("inputChannel")
.publishSubscribeChannel(s -> s.applySequence(true)
.subscribe(f -> f
.handle(Http.outboundGateway("http://provider1.com/...")
.httpMethod(HttpMethod.GET)
.expectedResponseType(ItemDTO[].class))
).subscribe(f -> f
.handle(Http.outboundGateway("http://provider2.com/...")
.httpMethod(HttpMethod.GET)
.expectedResponseType(ItemDTO[].class)
)
)
)
.aggregate()
.get();
}
However, when running my code, the resulting array contains the items returned by only one of the RESTful services. Is there any configuration step I am missing?
UPDATE
The following version corresponds to the full solution, taking into account Artem's comments.
@Bean
IntegrationFlow flow() throws Exception {
return IntegrationFlows.from("inputChannel-scatter")
.publishSubscribeChannel(s -> s.applySequence(true)
.subscribe(f -> f
.handle(Http.outboundGateway("http://provider1.com/...")
.httpMethod(HttpMethod.GET)
.expectedResponseType(ItemDTO[].class))
.channel("inputChannel-gather"))
.subscribe(f -> f
.handle(Http.outboundGateway("http://provider2.com/...")
.httpMethod(HttpMethod.GET)
.expectedResponseType(ItemDTO[].class))
.channel("inputChannel-gather")))
.get();
}
@Bean
IntegrationFlow gatherFlow() {
return IntegrationFlows.from("inputChannel-gather")
.aggregate(a -> a.outputProcessor(g -> new GenericMessage<ItemDTO[]>(
g.getMessages().stream()
.flatMap(m -> Arrays.stream((ItemDTO[]) m.getPayload()))
.collect(Collectors.toList()).toArray(new ItemDTO[0]))))
.get();
}
Actually it doesn't work that way.
The .aggregate()
is a third subscriber to that publishSubscribeChannel
.
You have to sever your flow to two of them. Like this:
@Bean
public IntegrationFlow publishSubscribeFlow() {
return flow -> flow
.publishSubscribeChannel(s -> s
.applySequence(true)
.subscribe(f -> f
.handle((p, h) -> "Hello")
.channel("publishSubscribeAggregateFlow.input"))
.subscribe(f -> f
.handle((p, h) -> "World!")
.channel("publishSubscribeAggregateFlow.input"))
);
}
@Bean
public IntegrationFlow publishSubscribeAggregateFlow() {
return flow -> flow
.aggregate(a -> a.outputProcessor(g -> g.getMessages()
.stream()
.<String>map(m -> (String) m.getPayload())
.collect(Collectors.joining(" "))))
.channel(c -> c.queue("subscriberAggregateResult"));
}
Pay attention, please, to the .channel("publishSubscribeAggregateFlow.input")
usage from both subscribers.
To be honest that is a point of any publish-subscribe
. We must know where to send the result of all subscribers if we are going to aggregate them.
Your use-case recalls me the Scatter-Gather EIP pattern.
We don't have its implementation in the DSL yet.
Feel free to raise a GH issue on the matter and we will try to handle it in the upcoming 1.2
version.
UPDATE
The GH issue on the matter: https://github.com/spring-projects/spring-integration-java-dsl/issues/75