My end goal with this is to implement a way to make composite API calls within the body of a gateway route filter. I have a very basic demo application running on port 9000
and exposing a few endpoints. Here is the REST controller:
@RestController
@RequestMapping("/composite")
public class CompositeCallController {
@GetMapping("/test/one")
public Map<String, Object> first() {
Map<String, Object> output = new HashMap<>();
output.put("response-1-1", "FIRST 1");
output.put("response-1-2", "FIRST 2");
output.put("response-1-3", "FIRST 3");
return output;
}
@GetMapping("/test/two")
public Map<String, Object> second() {
Map<String, Object> output = new HashMap<>();
output.put("response-2-1", "SECOND 1");
output.put("response-2-2", "SECOND 2");
output.put("response-2-3", "SECOND 3");
return output;
}
@GetMapping
public Map<String, Object> init() {
return new HashMap<>();
}
}
Both controllers return just a plain Map with a few entries inside. I have a Spring Cloud Gateway application running on a separate port, and I have configured via YML a route that leads to the localhost:9000/composite
endpoint, which returns a blank map. Then I have a ModifyResponseBodyGatewayFilterFactory
filter that kicks in and creates two brand new requests towards the two other endpoints in my demo application.
I want to aggregate those two responses into one by transferring them into a new map that I return to the filter chain. Here's how my filter looks:
public GatewayFilter apply(final Config config) {
final ModifyResponseBodyGatewayFilterFactory.Config modifyResponseBodyFilterFactoryConfig = new ModifyResponseBodyGatewayFilterFactory.Config();
modifyResponseBodyFilterFactoryConfig.setRewriteFunction(Map.class, Map.class, (exchange, body) -> {
WebClient client = WebClient.create();
Mono<Map<String, Object>> firstCallMono = client.get()
.uri(FIRST_SERVICE_URL)
.retrieve()
.bodyToMono(json);
Mono<Map<String, Object>> secondCallMono = client.get()
.uri(SECOND_SERVICE_URL)
.retrieve()
.bodyToMono(json);
Map<String, Object> output = new HashMap<>();
Mono.zip(firstCallMono, secondCallMono)
.log()
.subscribe(v -> {
System.out.println("FIRST VALUE = " + v.getT1());
System.out.println("SECOND VALUE = " + v.getT2());
output.put("1", v.getT1());
output.put("2", v.getT2());
});
System.out.println("OUTPUT VALUE 1 = " + output.get("1"));
System.out.println("OUTPUT VALUE 2 = " + output.get("2"));
return Mono.just(output);
});
return modifyResponseBodyFilterFactory.apply(modifyResponseBodyFilterFactoryConfig);
}
The json
type is defined as private final ParameterizedTypeReference<Map<String, Object>> json = new ParameterizedTypeReference<>() {};
The URIs are as follows:
public static final String FIRST_SERVICE_URL = "http://localhost:9000/composite/test/one";
public static final String SECOND_SERVICE_URL = "http://localhost:9000/composite/test/two";
And here's my gateway config for reference:
logging:
level:
reactor:
netty: INFO
org:
springframework:
cloud:
gateway: TRACE
spring:
codec:
max-in-memory-size: 20MB
cloud:
gateway:
httpclient:
wiretap: true
httpserver:
wiretap: true
routes:
- id: composite-call-test
uri: http://localhost:9000
predicates:
- Path=/composite/**
filters:
- CompositeApiCallFilter
To merge the Monos, I use Mono.zip()
as it seems to serve just that goal. I've purposefully put two System.out.println()
s within the zip()
body to make sure the responses from the above two WebClient requests are actually correct, and it definitely seems so:
FIRST VALUE = {response-1-2=FIRST 2, response-1-3=FIRST 3, response-1-1=FIRST 1}
SECOND VALUE = {response-2-3=SECOND 3, response-2-1=SECOND 1, response-2-2=SECOND 2}
However, I've also put two console prints after the zip()
to check if something is populated in the map, and it's completely empty for some reason:
OUTPUT VALUE 1 = null
OUTPUT VALUE 2 = null
Here's the full console output from the request for reference:
2022-05-13 14:53:22.087 INFO 72992 --- [ctor-http-nio-3] reactor.Mono.Zip.1 : onSubscribe([Fuseable] MonoZip.ZipCoordinator)
2022-05-13 14:53:22.090 INFO 72992 --- [ctor-http-nio-3] reactor.Mono.Zip.1 : request(unbounded)
OUTPUT VALUE 1 = null
OUTPUT VALUE 2 = null
2022-05-13 14:53:22.139 INFO 72992 --- [ctor-http-nio-3] reactor.Mono.Zip.1 : onNext([{response-1-2=FIRST 2, response-1-3=FIRST 3, response-1-1=FIRST 1},{response-2-3=SECOND 3, response-2-1=SECOND 1, response-2-2=SECOND 2}])
FIRST VALUE = {response-1-2=FIRST 2, response-1-3=FIRST 3, response-1-1=FIRST 1}
SECOND VALUE = {response-2-3=SECOND 3, response-2-1=SECOND 1, response-2-2=SECOND 2}
2022-05-13 14:53:22.140 INFO 72992 --- [ctor-http-nio-3] reactor.Mono.Zip.1 : onComplete()
I tried a bunch of other ways of doing the above, for example merging the two Mono
s into a Flux
by using firstCallMono.mergeWith(secondCallMono)
and then subscribing to the resulting Flux
object and populating the map, but the result is identical.
I also tried putting the two Mono
s into a Pair
object and extracting the values like so:
Pair<Mono<Map<String, Object>>, Mono<Map<String, Object>>> pair = new Pair(firstCall, secondCallDTOMono);
pair.getValue0().log().subscribe(v -> output.put("1", v));
pair.getValue1().log().subscribe(v -> output.put("2", v));
But again, the output
map is empty at the end, and I don't understand why. It seems like whatever comes back from the WebClient .get()
call is of type MonoFlapMap.FlatMapMain
and I suspect the issue comes from unpacking the values from this type into my regular HashMap, but I don't know how to resolve that issue. I tried using .map()
and .flatMap()
but neither worked.
Could someone please let me know how to extract those values?
Thanks to the advice of Toerktumlare, I was able to make it work properly. Here's the entire filter for reference:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.gateway.filter.GatewayFilter;
import org.springframework.cloud.gateway.filter.factory.AbstractGatewayFilterFactory;
import org.springframework.cloud.gateway.filter.factory.rewrite.ModifyResponseBodyGatewayFilterFactory;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
import java.util.HashMap;
import java.util.Map;
@Component
public class CompositeApiCallFilter extends AbstractGatewayFilterFactory<CompositeApiCallFilter.Config> {
public static final String COMPOSITE_TEST_URL = "http://localhost:9000/composite/test/";
private final ModifyResponseBodyGatewayFilterFactory modifyResponseBodyFilterFactory;
private final ParameterizedTypeReference<Map<String, Object>> json = new ParameterizedTypeReference<>() {
};
@Autowired
public CompositeApiCallFilter(ModifyResponseBodyGatewayFilterFactory factory) {
super(Config.class);
this.modifyResponseBodyFilterFactory = factory;
}
@Override
public GatewayFilter apply(final Config config) {
final ModifyResponseBodyGatewayFilterFactory.Config modifyResponseBodyFilterFactoryConfig = new ModifyResponseBodyGatewayFilterFactory.Config();
modifyResponseBodyFilterFactoryConfig.setRewriteFunction(Map.class, Map.class, (exchange, body) -> {
WebClient client = WebClient.create();
Mono<Map<String, Object>> firstCallMono = client.get()
.uri(COMPOSITE_TEST_URL + "one")
.retrieve()
.bodyToMono(json);
Mono<Map<String, Object>> secondCallMono = client.get()
.uri(COMPOSITE_TEST_URL + "two")
.retrieve()
.bodyToMono(json);
Map<String, Object> output = new HashMap<>();
return Mono.zip(firstCallMono, secondCallMono)
.flatMap(v -> {
output.put("1", v.getT1());
output.put("2", v.getT2());
return Mono.just(output);
});
});
return modifyResponseBodyFilterFactory.apply(modifyResponseBodyFilterFactoryConfig);
}
public static class Config {
}
}
And the respective output in Postman:
{
"1": {
"response-1-2": "FIRST 2",
"response-1-3": "FIRST 3",
"response-1-1": "FIRST 1"
},
"2": {
"response-2-3": "SECOND 3",
"response-2-1": "SECOND 1",
"response-2-2": "SECOND 2"
}
}
Seems like subscribing wasn't necessary at all, just zipping the monos and extracting their values with flatMap
worked well.