I am writing a generic method to call third party API from my micro-service.
private <R,P> R processRequest(String url, Class<R> type, HttpEntity<P> requestEntity, HttpMethod method) {
HttpHeaders httpHeaders = requestEntity.getHeaders();
Mono<R> result = getResult(method, url, httpHeaders, type);
R responseBody = result.block();
return responseBody;
}
private <R> Mono<R> getResult(HttpMethod method, String url, HttpHeaders httpHeaders, Class<R> type) {
return webClient.method(method)
.uri(url)
.accept(MediaType.ALL)
.contentType(MediaType.APPLICATION_JSON)
.headers(headers -> headers.putAll(httpHeaders))
.retrieve()
.onStatus(HttpStatusCode::is4xxClientError,
clientResponse -> Mono.error(new RuntimeException("Client error")))
.onStatus(HttpStatusCode::is5xxServerError,
clientResponse -> Mono.error(new RuntimeException("Server error")))
.bodyToMono(type);
}
But, when I am calling processRequest() method, I am getting the below error.
java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-3
Can anyone please help to solve this issue.
Tech stack : Java : 17 spring-boot-starter-parent : 3.1.1 spring-boot-starter-webflux : 3.1.1
P.S. I need a blocking call and I am aware that it is not good to use blocking operations in reactive code
Reactor is a non-blocking lib ,so it is not correct usage that block it event in spring http thread pool (PS but it is general that there may be some occasional required , reactor api is very useful);
In a nutshell, you can build a new business process thread pool to submit non-blocking api request.
Here are some code snippets for reference:
@Component
public class MyApiClient {
private final WebClient webClient;
public MyApiClient(WebClient.Builder webClientBuilder) {
this.webClient = webClientBuilder.build();
}
public <R, P> R processRequest(String url, Class<R> type, HttpEntity<P> requestEntity, HttpMethod method) {
HttpHeaders httpHeaders = requestEntity.getHeaders();
Mono<R> result = getResult(method, url, httpHeaders, type);
return blockMono(result);
}
private <R> Mono<R> getResult(HttpMethod method, String url, HttpHeaders httpHeaders, Class<R> type) {
return webClient.method(method)
.uri(url)
.accept(MediaType.ALL)
.contentType(MediaType.APPLICATION_JSON)
.headers(headers -> headers.putAll(httpHeaders))
.retrieve()
.onStatus(HttpStatus::is4xxClientError,
clientResponse -> Mono.error(new RuntimeException("Client error")))
.onStatus(HttpStatus::is5xxServerError,
clientResponse -> Mono.error(new RuntimeException("Server error")))
.bodyToMono(type);
}
private <R> R blockMono(Mono<R> mono) {
// just init a thread pool , any way you favorite is ok
ExecutorService executor = Executors.newSingleThreadExecutor();
try {
return executor.submit((Callable<R>) mono::block).get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException("Error occurred while blocking Mono", e);
} finally {
executor.shutdown();
}
}
}
you are in spring environment so it is necessary that init your business process thread pool as a bean wired into spring like following:
@Bean
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("Business-Pool-");
executor.initialize();
return executor;
}
Annotation bean like following or build default construction method to autowired
@Resource
ThreadPoolTaskExecutor executor;