javaspring-bootspring-webfluxspring-webclient

How to block() Reactor Http thread while calling third party api using Spring boot webclient?


I am writing a generic method to call third party API from my micro-service.

private <R,P> R processRequest(String url, Class<R> type, HttpEntity<P> requestEntity, HttpMethod method) {

    HttpHeaders httpHeaders = requestEntity.getHeaders();

    Mono<R> result =  getResult(method, url, httpHeaders, type);
    R  responseBody =  result.block();

    return responseBody;
}

private <R> Mono<R> getResult(HttpMethod method, String url, HttpHeaders httpHeaders, Class<R> type) {
    return webClient.method(method)
            .uri(url)
            .accept(MediaType.ALL)
            .contentType(MediaType.APPLICATION_JSON)
            .headers(headers -> headers.putAll(httpHeaders))
            .retrieve()
            .onStatus(HttpStatusCode::is4xxClientError,
                    clientResponse -> Mono.error(new RuntimeException("Client error")))
            .onStatus(HttpStatusCode::is5xxServerError,
                    clientResponse -> Mono.error(new RuntimeException("Server error")))
            .bodyToMono(type);
}

But, when I am calling processRequest() method, I am getting the below error.

java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-3

Can anyone please help to solve this issue.

Tech stack : Java : 17 spring-boot-starter-parent : 3.1.1 spring-boot-starter-webflux : 3.1.1

P.S. I need a blocking call and I am aware that it is not good to use blocking operations in reactive code


Solution

  • Reactor is a non-blocking lib ,so it is not correct usage that block it event in spring http thread pool (PS but it is general that there may be some occasional required , reactor api is very useful);

    In a nutshell, you can build a new business process thread pool to submit non-blocking api request.

    Here are some code snippets for reference:

    @Component
    public class MyApiClient {
    
        private final WebClient webClient;
    
        public MyApiClient(WebClient.Builder webClientBuilder) {
            this.webClient = webClientBuilder.build();
        }
    
        public <R, P> R processRequest(String url, Class<R> type, HttpEntity<P> requestEntity, HttpMethod method) {
            HttpHeaders httpHeaders = requestEntity.getHeaders();
            Mono<R> result = getResult(method, url, httpHeaders, type);
            return blockMono(result);
        }
    
        private <R> Mono<R> getResult(HttpMethod method, String url, HttpHeaders httpHeaders, Class<R> type) {
            return webClient.method(method)
                    .uri(url)
                    .accept(MediaType.ALL)
                    .contentType(MediaType.APPLICATION_JSON)
                    .headers(headers -> headers.putAll(httpHeaders))
                    .retrieve()
                    .onStatus(HttpStatus::is4xxClientError,
                            clientResponse -> Mono.error(new RuntimeException("Client error")))
                    .onStatus(HttpStatus::is5xxServerError,
                            clientResponse -> Mono.error(new RuntimeException("Server error")))
                    .bodyToMono(type);
        }
    
        private <R> R blockMono(Mono<R> mono) {
    // just init a thread pool , any way you favorite is ok 
            ExecutorService executor = Executors.newSingleThreadExecutor();
            try {
                return executor.submit((Callable<R>) mono::block).get();
            } catch (InterruptedException | ExecutionException e) {
                throw new RuntimeException("Error occurred while blocking Mono", e);
            } finally {
                executor.shutdown();
            }
        }
    }
    

    you are in spring environment so it is necessary that init your business process thread pool as a bean wired into spring like following:

       @Bean
        public ThreadPoolTaskExecutor taskExecutor() {
            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            executor.setCorePoolSize(5);
            executor.setMaxPoolSize(10);
            executor.setQueueCapacity(100);
            executor.setThreadNamePrefix("Business-Pool-");
            executor.initialize();
            return executor;
        }
    

    Annotation bean like following or build default construction method to autowired

      @Resource
        ThreadPoolTaskExecutor executor;