I have to execute multiple API calls simultaneously which are independent of each other:
Mono<Response1> response1= this.webClient
.post()
.uri(requestURI1)
.body(Flux.just(request.getRequestBody()), ParameterizedTypeReference<T>)
.exchangeToMono(response -> {
return response.statusCode().equals(HttpStatus.OK)
? response.bodyToMono(ParameterizedTypeReference<T>)
: response.createException().flatMap(Mono::error);
});
Mono<Response2> response2= this.webClient
.post()
.uri(requestURI2)
.body(Flux.just(request.getRequestBody()), ParameterizedTypeReference<T>)
.exchangeToMono(response -> {
return response.statusCode().equals(HttpStatus.OK)
? response.bodyToMono(ParameterizedTypeReference<T>)
: response.createException().flatMap(Mono::error);
});
Mono<Response3> response3= this.webClient
.post()
.uri(requestURI3)
.body(Flux.just(request.getRequestBody()), ParameterizedTypeReference<T>)
.exchangeToMono(response -> {
return response.statusCode().equals(HttpStatus.OK)
? response.bodyToMono(ParameterizedTypeReference<T>)
: response.createException().flatMap(Mono::error);
});
How can I get the response of above api calls in separate Objects and at the same time, they should be executed in parallel? Also, after executing these above calls and putting the data in separate objects, say, Response1, Response2, Response3, I want to execute another API call which consumes these responses Response1, Response2, Response3.
I have tried to use Flux.merge but this will merge the responses in single object which is not correct. Also read about Mono.zip, but these all are used to combine the responses which I don't want.
EDIT: Mono.zip works perfectly. I have a follow up question which I have mentioned in comments but posting it here also. So I have implemented like this:
Mono.zip(rs1, rs2, rs3).flatMap(tuples -> {
//do something with responses
return Mono.just(transformedData)
}).flatMap(transformedData -> {
//here another webclient.post call which consumes transformedData and return actualData in form of Mono<actualData>
Mono<actualData> data= callAPI;
return data;
});
Now this response is propagated to rest layer in form of Mono<actualData> and I am getting this in response: {
"scanAvailable": true
}
To combine publishers in parallel you can use Mono.zip
that will return TupleX<...>
and will be resolved when all publishers are resolved.
Mono<Tuple3<Response1, Response2, Response3>> res = Mono.zip(response1, response2, response3)
.map(tuples -> {
//do something with responses
return transformedData;
})
.flatMap(transformedData -> {
//here another webclient.post call which consumes transformedData and return actualData in form of Mono<actualData>
return callAPI(transformedData);
});
Depending on the error handling logic you could consider zipDelayError
As I mentioned in comment one of the key things with reactive API is to differentiate sync and async operations. In case of sync transformation, chain it with .map
and use .flatMap
or similar operator in case you want to chain another async method.