I'v created a Flux which emits API response on subscription. The API takes few seconds to respond. I'm also switching the flow to a sceduler with 2 threads.
private Scheduler sc = Schedulers.newBoundedElastic(2, Integer.MAX_VALUE, "newwww-sch");
Flux.just(i)
.flatMap(x -> {
try {
System.out.println(Thread.currentThread().getName() + " Do Request " + i);
return Mono.just(spectrumAvailabilityApi.getSpectrumAvailability(spectrumAvailabilityRequest));
} catch (ApiException e) {
throw new RuntimeException(e);
}
})
.subscribeOn(sc)
.subscribe(s -> System.out.println(Thread.currentThread().getName() + " Got Response " + i));
When I trigger the flow using paraller threads, I notice it submits 2 requests and then does nothing till it gets response from server for atleast one of the requests.
newwww-sch-1 Do Request 11
newwww-sch-2 Do Request 12
newwww-sch-2 Got Response 12
newwww-sch-1 Got Response 11
newwww-sch-2 Do Request 13
newwww-sch-1 Do Request 14
newwww-sch-1 Got Response 14
newwww-sch-2 Got Response 13
Also, the thread submitting the request is the same thread processing the response of the request. Is this expected? My expectation was that it will submit the requests as long as the scheduler threads are available or the threads are waiting for the response from server.
I was using blocking OkHttpClient for making the API calls, as a result the thread remains blocked and cannot take more request. To solve it, I switched to non-blocking Spring WebClient which returns a Mono.
private WebClient webClient = WebClient.builder()
.baseUrl("https://demo.com/api/demo")
.defaultHeader("Authorization", token)
.defaultHeader("Content-Type", "application/json")
.defaultHeader("Accept", "application/json")
.build();
UriSpec<RequestBodySpec> uriSpec = webClient.post();
RequestHeadersSpec<?> responseMono = uriSpec.uri("/v1/demo")
.body(Mono.just(demoRequest), DemoRequest.class)
.header("Content-Type", APPLICATION_JSON_VALUE)
.accept(MediaType.valueOf(APPLICATION_JSON_VALUE))
.header("Authorization", token);
return responseMono.retrieve()
.bodyToMono(DemoResponse.class);
Using WebClient I was able to perform non-blocking IO and threads were not blocked for server response.
newwww-sch-1 Do Request 11
newwww-sch-2 Do Request 12
newwww-sch-2 Do Request 13
newwww-sch-1 Do Request 14
newwww-sch-2 Got Response 12
newwww-sch-1 Got Response 11
newwww-sch-1 Got Response 14
newwww-sch-2 Got Response 13