javaspring-bootspring-cloudconsulspring-cloud-consul

Service Discovery not working when using WebClient with Consul


We're migrating the Spring Boot from 2 to 3 and also getting rid off Netflix Ribbon. We're experiencing an issue while trying to discover a service using Consul. If we rollback to Spring Boot 2 + Netflix Ribbon, it works with no problem, so we discarded any connectivity issue.

Logging errors:

RoundRobinLoadBalancer|No servers available for service: cachedavailability-integrations-service
ReactorLoadBalancerExchangeFilterFunction|LoadBalancer does not contain an instance for the service cachedavailability-integrations-service
Communication error with uri: http://cachedavailability-integrations-service/testing org.springframework.web.reactive.function.client.WebClientResponseException$ServiceUnavailable: 503 Service Unavailable from UNKNOWN 
    at org.springframework.web.reactive.function.client.WebClientResponseException.create(WebClientResponseException.java:336)
    Suppressed: The stacktrace has been enhanced by Reactor, refer to additional information below: 
Error has been observed at the following site(s):
    *__checkpoint ⇢ 503 SERVICE_UNAVAILABLE from GET http://cachedavailability-integrations-service/testing [DefaultWebClient]

Consul interface image

  1. The service "si-manager" is the one trying to discover and access the "cachedavailability" one.
  2. It is registering well, but not discovering the other services registered when using WebClient bean.

We have already tried many ways, such as:

  1. https://docs.spring.io/spring-cloud-consul/docs/current/reference/html/#using-the-discoveryclient
  2. Service discovery with spring webflux WebClient
  3. Configuring spring-cloud loadbalancer without autoconfiguration

Sample

Main Class

@org.springframework.cloud.client.discovery.EnableDiscoveryClient
public class MainApplication {...}

WebClient config

  @Bean(name = "webClientConsulAvailability")
  public WebClient webClientConsulAvailability(
    WebClient.Builder webClientBuilder,
    ReactorLoadBalancerExchangeFilterFunction lbFunction,
    ExchangeFilterFunction logFilter
  ) {
    return webClientBuilder
      .filter(lbFunction)
      .filter(logFilter)
      .build();
  }

bootstrap.yml

spring:
  application:
    name: si-manager-service
  profiles:
    active: ${SPRING_PROFILES_ACTIVE:local}
  cloud:
    consul:
      host: localhost
      port: 8500
      enabled: true
      discovery:
        serviceName: ${spring.application.name}
        instanceId: ${spring.application.name}8500
        enabled: true
        # Register as a service in consul.
        register: true
        registerHealthCheck: true

Dependencies image

Consul version: v1.15.3

Usage example:

webClientConsulAvailability.get()
      .uri("http://cachedavailability-integrations-service/testing")
      .retrieve()
      .bodyToFlux(MyDTO.class)
      .doOnError(e -> {
        if (isErrorLogLevel(e)) {
          log.error(COMMUNICATION_ERROR_WITH_URI + uri, e);
        } else {
          log.warn(COMMUNICATION_ERROR_WITH_URI + uri, e);
        }
      })
      .onErrorResume(e -> Flux.empty());

Solution

  • Fixed with code below.

    package xpto;
    import com.fasterxml.jackson.databind.ObjectMapper;
    import io.netty.channel.ChannelOption;
    import io.netty.handler.timeout.ReadTimeoutHandler;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
    import org.springframework.cloud.client.loadbalancer.reactive.DeferringLoadBalancerExchangeFilterFunction;
    import org.springframework.cloud.client.loadbalancer.reactive.LoadBalancedExchangeFilterFunction;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.http.MediaType;
    import org.springframework.http.client.reactive.ClientHttpConnector;
    import org.springframework.http.client.reactive.ReactorClientHttpConnector;
    import org.springframework.http.codec.json.Jackson2JsonDecoder;
    import org.springframework.http.codec.json.Jackson2JsonEncoder;
    import org.springframework.web.reactive.function.client.ExchangeStrategies;
    import org.springframework.web.reactive.function.client.WebClient;
    import org.springframework.web.util.DefaultUriBuilderFactory;
    import reactor.netty.http.client.HttpClient;
    
    import java.util.concurrent.TimeUnit;
    
    @Configuration
    @EnableDiscoveryClient
    @Slf4j
    public class WebclientConfiguration {
    
      private final ObjectMapper objectMapper;
      @Value("${web.client.read-timeout:25000}")
      private final int webClientReadTimeout;
      @Value("${web.client.connection-timeout:3000}")
      private final int webClientConnectionTimeout;
    
      public WebclientConfiguration(ObjectMapper objectMapper,
        @Value("${web.client.read-timeout:25000}") int webClientReadTimeout,
        @Value("${web.client.connection-timeout:3000}") int webClientConnectionTimeout) {
        this.objectMapper = objectMapper;
        this.webClientReadTimeout = webClientReadTimeout;
        this.webClientConnectionTimeout = webClientConnectionTimeout;
      }
    
      private ClientHttpConnector getClientHttpConnector() {
        return new ReactorClientHttpConnector(
          HttpClient.create().compress(true).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, webClientConnectionTimeout)
            .doOnConnected(conn -> conn.addHandlerLast(new ReadTimeoutHandler(webClientReadTimeout, TimeUnit.MILLISECONDS))));
      }
    
      @Bean
      public DefaultUriBuilderFactory builderFactory() {
        DefaultUriBuilderFactory factory = new DefaultUriBuilderFactory();
        factory.setEncodingMode(DefaultUriBuilderFactory.EncodingMode.VALUES_ONLY);
        return factory;
      }
    
      @Bean(name = "webClientConsul")
      public WebClient webClientConsul(
        WebClient.Builder webClientBuilder,
        DeferringLoadBalancerExchangeFilterFunction<LoadBalancedExchangeFilterFunction> exchangeFilterFunction
      ) {
        webClientBuilder.filter(exchangeFilterFunction);
        return buildWebClient(webClientBuilder);
      }
    
      @Bean(name = "webClientDefault")
      public WebClient webClientDefault(WebClient.Builder webClientBuilder) {
        return buildWebClient(webClientBuilder);
      }
    
      private WebClient buildWebClient(WebClient.Builder webClientBuilder) {
        ClientHttpConnector connector = getClientHttpConnector();
        return webClientBuilder
          .clientConnector(connector)
          .exchangeStrategies(getExchangeStrategies())
          .build();
      }
    
      private ExchangeStrategies getExchangeStrategies() {
        return  ExchangeStrategies.builder()
          .codecs(clientDefaultCodecsConfigurer -> {
            clientDefaultCodecsConfigurer
              .defaultCodecs()
              .jackson2JsonEncoder(
                new Jackson2JsonEncoder(objectMapper, MediaType.APPLICATION_JSON, MediaType.TEXT_EVENT_STREAM, MediaType.APPLICATION_STREAM_JSON));
            clientDefaultCodecsConfigurer
              .defaultCodecs()
              .jackson2JsonDecoder(
                new Jackson2JsonDecoder(objectMapper, MediaType.APPLICATION_JSON, MediaType.TEXT_EVENT_STREAM, MediaType.APPLICATION_STREAM_JSON));
          }).build();
      }
      
    }