spring-bootmonospring-webfluxproject-reactorflux

Reactor Flux remains blocks till getting response from server


I'v created a Flux which emits API response on subscription. The API takes few seconds to respond. I'm also switching the flow to a sceduler with 2 threads.

private Scheduler sc = Schedulers.newBoundedElastic(2, Integer.MAX_VALUE, "newwww-sch");

Flux.just(i)
        .flatMap(x -> {
          try {
            System.out.println(Thread.currentThread().getName() + " Do Request " + i);
            return Mono.just(spectrumAvailabilityApi.getSpectrumAvailability(spectrumAvailabilityRequest));
          } catch (ApiException e) {
            throw new RuntimeException(e);
          }
        })
    .subscribeOn(sc)
    .subscribe(s -> System.out.println(Thread.currentThread().getName() + " Got Response " + i));

When I trigger the flow using paraller threads, I notice it submits 2 requests and then does nothing till it gets response from server for atleast one of the requests.

newwww-sch-1 Do Request 11
newwww-sch-2 Do Request 12
newwww-sch-2 Got Response 12
newwww-sch-1 Got Response 11
newwww-sch-2 Do Request 13
newwww-sch-1 Do Request 14
newwww-sch-1 Got Response 14
newwww-sch-2 Got Response 13

Also, the thread submitting the request is the same thread processing the response of the request. Is this expected? My expectation was that it will submit the requests as long as the scheduler threads are available or the threads are waiting for the response from server.


Solution

  • I was using blocking OkHttpClient for making the API calls, as a result the thread remains blocked and cannot take more request. To solve it, I switched to non-blocking Spring WebClient which returns a Mono.

    private WebClient webClient = WebClient.builder()
        .baseUrl("https://demo.com/api/demo")
        .defaultHeader("Authorization", token)
        .defaultHeader("Content-Type", "application/json")
        .defaultHeader("Accept", "application/json")
        .build();
    UriSpec<RequestBodySpec> uriSpec = webClient.post();
    
    RequestHeadersSpec<?> responseMono = uriSpec.uri("/v1/demo")
        .body(Mono.just(demoRequest), DemoRequest.class)
        .header("Content-Type", APPLICATION_JSON_VALUE)
        .accept(MediaType.valueOf(APPLICATION_JSON_VALUE))
        .header("Authorization", token);
    
    return responseMono.retrieve()
        .bodyToMono(DemoResponse.class);
    

    Using WebClient I was able to perform non-blocking IO and threads were not blocked for server response.

    newwww-sch-1 Do Request 11
    newwww-sch-2 Do Request 12
    newwww-sch-2 Do Request 13
    newwww-sch-1 Do Request 14
    newwww-sch-2 Got Response 12
    newwww-sch-1 Got Response 11
    newwww-sch-1 Got Response 14
    newwww-sch-2 Got Response 13