My application setup is mentioned as part of issue # Correct way of using spring webclient in spring amqp where I am trying to use the Spring webclient to make API calls in Spring AMQP rabbit MQ consumer threads. The issue seems to be that the parallel flux blocking call just stalls or takes a very long time after the first few requests are fired.
To simulate this, I did the below minimalistic setup -
Dependencies used
Spring boot 2.2.6.RELEASE
spring-boot-starter-web
spring-boot-starter-webflux
reactor-netty 0.9.14.RELEASE
As mentioned in the other linked issue, below is the configuration for webclient -
@Bean
public WebClient webClient() {
ConnectionProvider connectionProvider =
ConnectionProvider.builder("fixed")
.lifo()
.pendingAcquireTimeout(Duration.ofMillis(200000))
.maxConnections(100)
.pendingAcquireMaxCount(3000)
.maxIdleTime(Duration.ofMillis(290000))
.build();
HttpClient client = HttpClient.create(connectionProvider);
client.tcpConfiguration(
<< connection timeout, read timeout, write timeout is set here....>>);
Webclient.Builder builder =
Webclient.builder()
.baseUrl(<< base URL >>)
.clientConnector(new ReactorClientHttpConnector(client));
return builder.build();
}
Below is @Service class with parallel flux webclient calls -
@Service
public class FluxtestService {
public Flux<Response> getFlux(List<Request> reqList) {
return Flux.fromIterable(reqList)
.parallel()
.runOn(Schedulers.elastic())
.flatMap(s
-> {return webClient.method(POST)
.uri(<< downstream url >>)
.body(BodyInserters.fromValue(s))
.exchange()
.flatMap(response -> {
if (response.statusCode().isError()) {
return Mono.just(new Response());
}
return response.bodyToMono(Response.class);
})})
.sequential();
}
}
}
To simulate Spring AMQP rabbit mq consumer/listener, I created the below @RestController -
@RestController public class FluxTestController
@Autowired private FluxtestService service;
@PostMapping("/fluxtest")
public List<Response> getFlux(List<Request> reqlist) {
return service.getFlux(reqlist).collectList().block();
}
I tried firing requests from JMeter with around 15 threads. The first few sets of requests are processed very quickly. While requests are being served, I can see the below set of logs in the log file -
Channel cleaned, now 32 active connections and 68 inactive connections
Once I submit more sets of requests, the active connections keep increasing until it reaches the max configured of 100. I don't see it decreasing at all. Until this point, the response time is ok.
But any subsequent requests start taking a very long time. Also, I don't see the active connections reducing much at all even though no requests are being fired.
Also after some time, I see the below exceptions -
reactor.netty.internal.shaded.reactor.pool.PoolAcquireTimeoutException: Pool#acquire(Duration) has been pending for more than the configured timeout of 200000 ms
This probably shows that the downstream connection is not being released. Please help advise on this issue and possible fixes.
Seems issue was because the underlying connection was not being properly released in case webclient downstream call responded with error status. While using "exchange" with "webclient", it seems we need to ensure that the response is properly released; else it can lead to connections leak. Below are the changes that seemed to fix this issue -
Replace
.flatMap(response -> {
if(response.statusCode().isError()) {
return Mono.just(new Response());
}
return response.bodyToMono(Response.class);
})
with
.flatMap(response -> {
if(response.statusCode().isError()) {
response.releaseBody().thenReturn(Mono.just(new Response()));
}
return response.bodyToMono(Response.class);
})