We're migrating the Spring Boot from 2 to 3 and also getting rid off Netflix Ribbon. We're experiencing an issue while trying to discover a service using Consul. If we rollback to Spring Boot 2 + Netflix Ribbon, it works with no problem, so we discarded any connectivity issue.
Logging errors:
RoundRobinLoadBalancer|No servers available for service: cachedavailability-integrations-service
ReactorLoadBalancerExchangeFilterFunction|LoadBalancer does not contain an instance for the service cachedavailability-integrations-service
Communication error with uri: http://cachedavailability-integrations-service/testing org.springframework.web.reactive.function.client.WebClientResponseException$ServiceUnavailable: 503 Service Unavailable from UNKNOWN
at org.springframework.web.reactive.function.client.WebClientResponseException.create(WebClientResponseException.java:336)
Suppressed: The stacktrace has been enhanced by Reactor, refer to additional information below:
Error has been observed at the following site(s):
*__checkpoint ⇢ 503 SERVICE_UNAVAILABLE from GET http://cachedavailability-integrations-service/testing [DefaultWebClient]
Consul interface
We have already tried many ways, such as:
Sample
Main Class
@org.springframework.cloud.client.discovery.EnableDiscoveryClient
public class MainApplication {...}
WebClient config
@Bean(name = "webClientConsulAvailability")
public WebClient webClientConsulAvailability(
WebClient.Builder webClientBuilder,
ReactorLoadBalancerExchangeFilterFunction lbFunction,
ExchangeFilterFunction logFilter
) {
return webClientBuilder
.filter(lbFunction)
.filter(logFilter)
.build();
}
bootstrap.yml
spring:
application:
name: si-manager-service
profiles:
active: ${SPRING_PROFILES_ACTIVE:local}
cloud:
consul:
host: localhost
port: 8500
enabled: true
discovery:
serviceName: ${spring.application.name}
instanceId: ${spring.application.name}8500
enabled: true
# Register as a service in consul.
register: true
registerHealthCheck: true
Dependencies
Consul version: v1.15.3
Usage example:
webClientConsulAvailability.get()
.uri("http://cachedavailability-integrations-service/testing")
.retrieve()
.bodyToFlux(MyDTO.class)
.doOnError(e -> {
if (isErrorLogLevel(e)) {
log.error(COMMUNICATION_ERROR_WITH_URI + uri, e);
} else {
log.warn(COMMUNICATION_ERROR_WITH_URI + uri, e);
}
})
.onErrorResume(e -> Flux.empty());
Fixed with code below.
package xpto;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.channel.ChannelOption;
import io.netty.handler.timeout.ReadTimeoutHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.client.loadbalancer.reactive.DeferringLoadBalancerExchangeFilterFunction;
import org.springframework.cloud.client.loadbalancer.reactive.LoadBalancedExchangeFilterFunction;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.MediaType;
import org.springframework.http.client.reactive.ClientHttpConnector;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.http.codec.json.Jackson2JsonDecoder;
import org.springframework.http.codec.json.Jackson2JsonEncoder;
import org.springframework.web.reactive.function.client.ExchangeStrategies;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.util.DefaultUriBuilderFactory;
import reactor.netty.http.client.HttpClient;
import java.util.concurrent.TimeUnit;
@Configuration
@EnableDiscoveryClient
@Slf4j
public class WebclientConfiguration {
private final ObjectMapper objectMapper;
@Value("${web.client.read-timeout:25000}")
private final int webClientReadTimeout;
@Value("${web.client.connection-timeout:3000}")
private final int webClientConnectionTimeout;
public WebclientConfiguration(ObjectMapper objectMapper,
@Value("${web.client.read-timeout:25000}") int webClientReadTimeout,
@Value("${web.client.connection-timeout:3000}") int webClientConnectionTimeout) {
this.objectMapper = objectMapper;
this.webClientReadTimeout = webClientReadTimeout;
this.webClientConnectionTimeout = webClientConnectionTimeout;
}
private ClientHttpConnector getClientHttpConnector() {
return new ReactorClientHttpConnector(
HttpClient.create().compress(true).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, webClientConnectionTimeout)
.doOnConnected(conn -> conn.addHandlerLast(new ReadTimeoutHandler(webClientReadTimeout, TimeUnit.MILLISECONDS))));
}
@Bean
public DefaultUriBuilderFactory builderFactory() {
DefaultUriBuilderFactory factory = new DefaultUriBuilderFactory();
factory.setEncodingMode(DefaultUriBuilderFactory.EncodingMode.VALUES_ONLY);
return factory;
}
@Bean(name = "webClientConsul")
public WebClient webClientConsul(
WebClient.Builder webClientBuilder,
DeferringLoadBalancerExchangeFilterFunction<LoadBalancedExchangeFilterFunction> exchangeFilterFunction
) {
webClientBuilder.filter(exchangeFilterFunction);
return buildWebClient(webClientBuilder);
}
@Bean(name = "webClientDefault")
public WebClient webClientDefault(WebClient.Builder webClientBuilder) {
return buildWebClient(webClientBuilder);
}
private WebClient buildWebClient(WebClient.Builder webClientBuilder) {
ClientHttpConnector connector = getClientHttpConnector();
return webClientBuilder
.clientConnector(connector)
.exchangeStrategies(getExchangeStrategies())
.build();
}
private ExchangeStrategies getExchangeStrategies() {
return ExchangeStrategies.builder()
.codecs(clientDefaultCodecsConfigurer -> {
clientDefaultCodecsConfigurer
.defaultCodecs()
.jackson2JsonEncoder(
new Jackson2JsonEncoder(objectMapper, MediaType.APPLICATION_JSON, MediaType.TEXT_EVENT_STREAM, MediaType.APPLICATION_STREAM_JSON));
clientDefaultCodecsConfigurer
.defaultCodecs()
.jackson2JsonDecoder(
new Jackson2JsonDecoder(objectMapper, MediaType.APPLICATION_JSON, MediaType.TEXT_EVENT_STREAM, MediaType.APPLICATION_STREAM_JSON));
}).build();
}
}