javaspring-bootnetflix-eurekanetflix-zuulgateway

RedirectingEurekaHttpClient Request execution error


I have a gateway application with customized loadbalancing rule, and here is the code following spring cloud official doc:

@RibbonClients(defaultConfiguration = CustomizedRibbonConfig.class)
public class RibbonClientConfiguration {
    public static class BazServiceList extends ConfigurationBasedServerList {
        public BazServiceList(IClientConfig config) {
            super.initWithNiwsConfig(config);
        }
    }
}

@Configuration
class CustomizedRibbonConfig {
    @Bean
    public IRule ribbonRule() {
        return new MetadataAwareRule();
    }

    @Bean
    public ServerListUpdater ribbonServerListUpdater() {
        return new EurekaNotificationServerListUpdater();
    }
}
public class MetadataAwarePredicate extends AbstractDiscoveryEnabledPredicate {

    /**
     * {@inheritDoc}
     */
    @Override
    protected boolean apply(DiscoveryEnabledServer server) {
        return true;
    }
}
@Slf4j
public class MetadataAwareRule extends AbstractDiscoveryEnabledRule {
    public static final ThreadLocal<String> CURRENT_LOAD_BALANCED_SERVICE_IP = new ThreadLocal<>();

    /**
     * Creates new instance of {@link MetadataAwareRule}.
     */
    public MetadataAwareRule() {
        this(new MetadataAwarePredicate());
    }

    /**
     * Creates new instance of {@link MetadataAwareRule} with specific predicate.
     *
     * @param predicate the predicate, can't be {@code null}
     * @throws IllegalArgumentException if predicate is {@code null}
     */
    public MetadataAwareRule(AbstractDiscoveryEnabledPredicate predicate) {
        super(predicate);
    }


    @Override
    public Server choose(Object key) {
       ....my customized choose policy....
}

And Here is the thing, I have a need to refresh application by firing RefreshEvent but it will lead to quite strange problem which may due to Eureka or zuul client of version from parent:

  <parent>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-netflix</artifactId>
    <version>2.2.5.RELEASE</version>
  </parent>

For easy recurrent of such problem, the function was simplified as a simple request showing below:

    @GetMapping("/test/event")
    public CommonResult testRaiseRefreshEvent() {
        ApplicationContextHolder.getApplicationContext().publishEvent(new RefreshEvent(this, null, "test to trigger the problem"));
        return CommonResult.succeed();
    }

Once request this api, application will take a refresh.

But sometimes, application will have this exception:

2022-10-19 11:29:32.947 [app:web-gateway,traceId:,spanId:,parentId:] [DiscoveryClient-CacheRefreshExecutor-0] ERROR | RedirectingEurekaHttpClient.java:83 | c.n.d.s.t.d.RedirectingEurekaHttpClient | Request execution error. endpoint=DefaultEndpoint{ serviceUrl='http://localhost:8000/eureka/}
javax.ws.rs.WebApplicationException: com.fasterxml.jackson.core.JsonParseException: processing aborted
 at [Source: (GZIPInputStream); line: 1, column: 18]
    at com.netflix.discovery.provider.DiscoveryJerseyProvider.readFrom(DiscoveryJerseyProvider.java:110)
    at com.sun.jersey.api.client.ClientResponse.getEntity(ClientResponse.java:634)
    at com.sun.jersey.api.client.ClientResponse.getEntity(ClientResponse.java:586)
    at com.netflix.discovery.shared.transport.jersey.AbstractJerseyEurekaHttpClient.getApplicationsInternal(AbstractJerseyEurekaHttpClient.java:200)
    at com.netflix.discovery.shared.transport.jersey.AbstractJerseyEurekaHttpClient.getApplications(AbstractJerseyEurekaHttpClient.java:167)
    at com.netflix.discovery.shared.transport.decorator.EurekaHttpClientDecorator$6.execute(EurekaHttpClientDecorator.java:137)
    at com.netflix.discovery.shared.transport.decorator.MetricsCollectingEurekaHttpClient.execute(MetricsCollectingEurekaHttpClient.java:73)
    at com.netflix.discovery.shared.transport.decorator.EurekaHttpClientDecorator.getApplications(EurekaHttpClientDecorator.java:134)
    at com.netflix.discovery.shared.transport.decorator.EurekaHttpClientDecorator$6.execute(EurekaHttpClientDecorator.java:137)
    at com.netflix.discovery.shared.transport.decorator.RedirectingEurekaHttpClient.executeOnNewServer(RedirectingEurekaHttpClient.java:118)
    at com.netflix.discovery.shared.transport.decorator.RedirectingEurekaHttpClient.execute(RedirectingEurekaHttpClient.java:79)
    at com.netflix.discovery.shared.transport.decorator.EurekaHttpClientDecorator.getApplications(EurekaHttpClientDecorator.java:134)
    at com.netflix.discovery.shared.transport.decorator.EurekaHttpClientDecorator$6.execute(EurekaHttpClientDecorator.java:137)
    at com.netflix.discovery.shared.transport.decorator.RetryableEurekaHttpClient.execute(RetryableEurekaHttpClient.java:120)
    at com.netflix.discovery.shared.transport.decorator.EurekaHttpClientDecorator.getApplications(EurekaHttpClientDecorator.java:134)
    at com.netflix.discovery.shared.transport.decorator.EurekaHttpClientDecorator$6.execute(EurekaHttpClientDecorator.java:137)
    at com.netflix.discovery.shared.transport.decorator.SessionedEurekaHttpClient.execute(SessionedEurekaHttpClient.java:77)
    at com.netflix.discovery.shared.transport.decorator.EurekaHttpClientDecorator.getApplications(EurekaHttpClientDecorator.java:134)
    at com.netflix.discovery.DiscoveryClient.getAndStoreFullRegistry(DiscoveryClient.java:1097)
    at com.netflix.discovery.DiscoveryClient.fetchRegistry(DiscoveryClient.java:1011)
    at com.netflix.discovery.DiscoveryClient.<init>(DiscoveryClient.java:440)
    at com.netflix.discovery.DiscoveryClient.<init>(DiscoveryClient.java:282)
    at com.netflix.discovery.DiscoveryClient.<init>(DiscoveryClient.java:278)
    at org.springframework.cloud.netflix.eureka.CloudEurekaClient.<init>(CloudEurekaClient.java:67)
    at org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration$RefreshableEurekaClientConfiguration.eurekaClient(EurekaClientAutoConfiguration.java:316)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:154)
    at org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:650)
    at org.springframework.beans.factory.support.ConstructorResolver.instantiateUsingFactoryMethod(ConstructorResolver.java:635)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.instantiateUsingFactoryMethod(AbstractAutowireCapableBeanFactory.java:1336)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBeanInstance(AbstractAutowireCapableBeanFactory.java:1176)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:556)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:516)
    at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$1(AbstractBeanFactory.java:363)
    at org.springframework.cloud.context.scope.GenericScope$BeanLifecycleWrapper.getBean(GenericScope.java:389)
    at org.springframework.cloud.context.scope.GenericScope.get(GenericScope.java:186)
    at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:360)
    at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:202)
    at org.springframework.aop.target.SimpleBeanTargetSource.getTarget(SimpleBeanTargetSource.java:35)
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:192)
    at com.sun.proxy.$Proxy169.getApplications(Unknown Source)
    at org.springframework.cloud.netflix.eureka.EurekaDiscoveryClient.getServices(EurekaDiscoveryClient.java:80)
    at org.springframework.cloud.client.discovery.composite.CompositeDiscoveryClient.getServices(CompositeDiscoveryClient.java:67)
    at org.springframework.cloud.netflix.zuul.filters.discovery.DiscoveryClientRouteLocator.locateRoutes(DiscoveryClientRouteLocator.java:121)
    at org.springframework.cloud.netflix.zuul.filters.discovery.DiscoveryClientRouteLocator.locateRoutes(DiscoveryClientRouteLocator.java:44)
    at org.springframework.cloud.netflix.zuul.filters.SimpleRouteLocator.doRefresh(SimpleRouteLocator.java:186)
    at org.springframework.cloud.netflix.zuul.filters.discovery.DiscoveryClientRouteLocator.refresh(DiscoveryClientRouteLocator.java:171)
    at org.springframework.cloud.netflix.zuul.filters.CompositeRouteLocator.refresh(CompositeRouteLocator.java:78)
    at org.springframework.cloud.netflix.zuul.web.ZuulHandlerMapping.setDirty(ZuulHandlerMapping.java:79)
    at org.springframework.cloud.netflix.zuul.ZuulServerAutoConfiguration$ZuulRefreshListener.reset(ZuulServerAutoConfiguration.java:315)
    at org.springframework.cloud.netflix.zuul.ZuulServerAutoConfiguration$ZuulRefreshListener.resetIfNeeded(ZuulServerAutoConfiguration.java:310)
    at org.springframework.cloud.netflix.zuul.ZuulServerAutoConfiguration$ZuulRefreshListener.onApplicationEvent(ZuulServerAutoConfiguration.java:304)
    at org.springframework.context.event.SimpleApplicationEventMulticaster.doInvokeListener(SimpleApplicationEventMulticaster.java:172)
    at org.springframework.context.event.SimpleApplicationEventMulticaster.invokeListener(SimpleApplicationEventMulticaster.java:165)
    at org.springframework.context.event.SimpleApplicationEventMulticaster.multicastEvent(SimpleApplicationEventMulticaster.java:139)
    at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:404)
    at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:361)
    at org.springframework.cloud.netflix.eureka.CloudEurekaClient.onCacheRefreshed(CloudEurekaClient.java:123)
    at com.netflix.discovery.DiscoveryClient.fetchRegistry(DiscoveryClient.java:1027)
    at com.netflix.discovery.DiscoveryClient.refreshRegistry(DiscoveryClient.java:1533)
    at com.netflix.discovery.DiscoveryClient$CacheRefreshThread.run(DiscoveryClient.java:1500)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
    at java.util.concurrent.FutureTask.run(FutureTask.java)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: com.fasterxml.jackson.core.JsonParseException: processing aborted
 at [Source: (GZIPInputStream); line: 1, column: 18]
    at com.netflix.discovery.converters.EurekaJacksonCodec$ApplicationsDeserializer.deserialize(EurekaJacksonCodec.java:805)
    at com.netflix.discovery.converters.EurekaJacksonCodec$ApplicationsDeserializer.deserialize(EurekaJacksonCodec.java:791)
    at com.fasterxml.jackson.databind.ObjectReader._unwrapAndDeserialize(ObjectReader.java:2196)
    at com.fasterxml.jackson.databind.ObjectReader._bindAndClose(ObjectReader.java:2054)
    at com.fasterxml.jackson.databind.ObjectReader.readValue(ObjectReader.java:1431)
    at com.netflix.discovery.converters.EurekaJacksonCodec.readValue(EurekaJacksonCodec.java:213)
    at com.netflix.discovery.converters.wrappers.CodecWrappers$LegacyJacksonJson.decode(CodecWrappers.java:314)
    at com.netflix.discovery.provider.DiscoveryJerseyProvider.readFrom(DiscoveryJerseyProvider.java:103)
    ... 69 common frames omitted

and

2022-10-19 11:29:32.956 [app:web-gateway,traceId:,spanId:,parentId:] [DiscoveryClient-CacheRefreshExecutor-0] ERROR | DiscoveryClient.java:1018 | c.netflix.discovery.DiscoveryClient | DiscoveryClient_WEB-GATEWAY/192.168.56.1:web-gateway:8004:NEW_GATEWAY_DEFAULT_GROUP - was unable to refresh its cache! status = Cannot execute request on any known server
com.netflix.discovery.shared.transport.TransportException: Cannot execute request on any known server
    at com.netflix.discovery.shared.transport.decorator.RetryableEurekaHttpClient.execute(RetryableEurekaHttpClient.java:112)
    at com.netflix.discovery.shared.transport.decorator.EurekaHttpClientDecorator.getApplications(EurekaHttpClientDecorator.java:134)
    at com.netflix.discovery.shared.transport.decorator.EurekaHttpClientDecorator$6.execute(EurekaHttpClientDecorator.java:137)
    at com.netflix.discovery.shared.transport.decorator.SessionedEurekaHttpClient.execute(SessionedEurekaHttpClient.java:77)
    at com.netflix.discovery.shared.transport.decorator.EurekaHttpClientDecorator.getApplications(EurekaHttpClientDecorator.java:134)
    at com.netflix.discovery.DiscoveryClient.getAndStoreFullRegistry(DiscoveryClient.java:1097)
    at com.netflix.discovery.DiscoveryClient.fetchRegistry(DiscoveryClient.java:1011)
    at com.netflix.discovery.DiscoveryClient.<init>(DiscoveryClient.java:440)
    at com.netflix.discovery.DiscoveryClient.<init>(DiscoveryClient.java:282)
    at com.netflix.discovery.DiscoveryClient.<init>(DiscoveryClient.java:278)
    at org.springframework.cloud.netflix.eureka.CloudEurekaClient.<init>(CloudEurekaClient.java:67)
    at org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration$RefreshableEurekaClientConfiguration.eurekaClient(EurekaClientAutoConfiguration.java:316)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:154)
    at org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:650)
    at org.springframework.beans.factory.support.ConstructorResolver.instantiateUsingFactoryMethod(ConstructorResolver.java:635)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.instantiateUsingFactoryMethod(AbstractAutowireCapableBeanFactory.java:1336)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBeanInstance(AbstractAutowireCapableBeanFactory.java:1176)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:556)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:516)
    at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$1(AbstractBeanFactory.java:363)
    at org.springframework.cloud.context.scope.GenericScope$BeanLifecycleWrapper.getBean(GenericScope.java:389)
    at org.springframework.cloud.context.scope.GenericScope.get(GenericScope.java:186)
    at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:360)
    at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:202)
    at org.springframework.aop.target.SimpleBeanTargetSource.getTarget(SimpleBeanTargetSource.java:35)
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:192)
    at com.sun.proxy.$Proxy169.getApplications(Unknown Source)
    at org.springframework.cloud.netflix.eureka.EurekaDiscoveryClient.getServices(EurekaDiscoveryClient.java:80)
    at org.springframework.cloud.client.discovery.composite.CompositeDiscoveryClient.getServices(CompositeDiscoveryClient.java:67)
    at org.springframework.cloud.netflix.zuul.filters.discovery.DiscoveryClientRouteLocator.locateRoutes(DiscoveryClientRouteLocator.java:121)
    at org.springframework.cloud.netflix.zuul.filters.discovery.DiscoveryClientRouteLocator.locateRoutes(DiscoveryClientRouteLocator.java:44)
    at org.springframework.cloud.netflix.zuul.filters.SimpleRouteLocator.doRefresh(SimpleRouteLocator.java:186)
    at org.springframework.cloud.netflix.zuul.filters.discovery.DiscoveryClientRouteLocator.refresh(DiscoveryClientRouteLocator.java:171)
    at org.springframework.cloud.netflix.zuul.filters.CompositeRouteLocator.refresh(CompositeRouteLocator.java:78)
    at org.springframework.cloud.netflix.zuul.web.ZuulHandlerMapping.setDirty(ZuulHandlerMapping.java:79)
    at org.springframework.cloud.netflix.zuul.ZuulServerAutoConfiguration$ZuulRefreshListener.reset(ZuulServerAutoConfiguration.java:315)
    at org.springframework.cloud.netflix.zuul.ZuulServerAutoConfiguration$ZuulRefreshListener.resetIfNeeded(ZuulServerAutoConfiguration.java:310)
    at org.springframework.cloud.netflix.zuul.ZuulServerAutoConfiguration$ZuulRefreshListener.onApplicationEvent(ZuulServerAutoConfiguration.java:304)
    at org.springframework.context.event.SimpleApplicationEventMulticaster.doInvokeListener(SimpleApplicationEventMulticaster.java:172)
    at org.springframework.context.event.SimpleApplicationEventMulticaster.invokeListener(SimpleApplicationEventMulticaster.java:165)
    at org.springframework.context.event.SimpleApplicationEventMulticaster.multicastEvent(SimpleApplicationEventMulticaster.java:139)
    at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:404)
    at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:361)
    at org.springframework.cloud.netflix.eureka.CloudEurekaClient.onCacheRefreshed(CloudEurekaClient.java:123)
    at com.netflix.discovery.DiscoveryClient.fetchRegistry(DiscoveryClient.java:1027)
    at com.netflix.discovery.DiscoveryClient.refreshRegistry(DiscoveryClient.java:1533)
    at com.netflix.discovery.DiscoveryClient$CacheRefreshThread.run(DiscoveryClient.java:1500)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
    at java.util.concurrent.FutureTask.run(FutureTask.java)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

And no matter how I refresh application again, gateway will never get chance to correct it's loadbalancer, no request will go through gateway due to exception like this:

java.lang.RuntimeException: com.netflix.client.ClientException: Load balancer does not have available server for client: web-message-center
    at org.springframework.cloud.openfeign.ribbon.LoadBalancerFeignClient.execute(LoadBalancerFeignClient.java:90)
    at org.springframework.cloud.sleuth.instrument.web.client.feign.TraceLoadBalancerFeignClient.execute(TraceLoadBalancerFeignClient.java:78)
    at feign.SynchronousMethodHandler.executeAndDecode(SynchronousMethodHandler.java:119)
    at feign.SynchronousMethodHandler.invoke(SynchronousMethodHandler.java:89)
    at feign.ReflectiveFeign$FeignInvocationHandler.invoke(ReflectiveFeign.java:100)
    at com.sun.proxy.$Proxy261.sendMessage(Unknown Source)
    at com.wwstation.webgateway.components.GatewayUrlCountProcessor.sendAccessLogWithMq(GatewayUrlCountProcessor.java:221)
    at com.wwstation.webgateway.components.GatewayUrlCountProcessor.run(GatewayUrlCountProcessor.java:82)
    at org.springframework.cloud.sleuth.instrument.async.TraceRunnable.run(TraceRunnable.java:68)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
    at java.util.concurrent.FutureTask.run(FutureTask.java)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:748)
Caused by: com.netflix.client.ClientException: Load balancer does not have available server for client: web-message-center
    at com.netflix.loadbalancer.LoadBalancerContext.getServerFromLoadBalancer(LoadBalancerContext.java:483)
    at com.netflix.loadbalancer.reactive.LoadBalancerCommand$1.call(LoadBalancerCommand.java:184)
    at com.netflix.loadbalancer.reactive.LoadBalancerCommand$1.call(LoadBalancerCommand.java:180)
    at rx.Observable.unsafeSubscribe(Observable.java:10327)
    at rx.internal.operators.OnSubscribeConcatMap.call(OnSubscribeConcatMap.java:94)
    at rx.internal.operators.OnSubscribeConcatMap.call(OnSubscribeConcatMap.java:42)
    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
    at rx.Observable.subscribe(Observable.java:10423)
    at rx.Observable.subscribe(Observable.java:10390)
    at rx.observables.BlockingObservable.blockForSingle(BlockingObservable.java:443)
    at rx.observables.BlockingObservable.single(BlockingObservable.java:340)
    at com.netflix.client.AbstractLoadBalancerAwareClient.executeWithLoadBalancer(AbstractLoadBalancerAwareClient.java:112)
    at org.springframework.cloud.openfeign.ribbon.LoadBalancerFeignClient.execute(LoadBalancerFeignClient.java:83)
    ... 15 common frames omitted

It can be seen from EurekaNotificationServerListUpdater that each fetch interval, there will be a thread refreshing server list. But once I fire a RefreshEvent the refreshing thread will be shut down by refreshing of environment (or else) and no heartbeat will be triggered when eureka's fetch interval reached again, so my application will have no latest server info from eureka.

Because of that, there is another problem which would take place when firing RefreshEvent:

Gateway can still redirect request to target service, but gateway will never get latest server list from eureka. Once the target service is down, gateway will crash my request instead of telling me the target service is not online (I have a exception handler to solve Load balancer does not have available server for client).

These 2 problems will not take place at the same time, which means when problem A occurs, problem B will never take place, and the same when problem B occurs. And they all occur once a RefreshEvent was fired.

I have no idea what's going on, can anyone help me with this or give me some tips where the cause might be?


Solution

  • After 2 days work out, the problem is solved and I have found the cause.

    Raw use of com.netflix.niws.loadbalancer.EurekaNotificationServerListUpdater in Zuul application will always have this kind of situation:

    At the very beginning, if a EurekaEventListener was fired by DiscoveryClient, listeners registered by EurekaNotificationServerListUpdater will receive this message and then try to update serverlist and it is normal.

    But when a RefreshEvent is fired, instance in application will re-register again which will cause DiscoveryClient to become a new instance! Which means, new DiscoveryClient will no longer holding listeners.

    And also, a default use of EurekaNotificationServerListUpdater will use a singleton instance of DiscoveryClient which will never be changed by RefreshEvent, and those listeners will be hold by that old DiscoveryClient which is not managed by Eureka again after RefreshEvent.

    Cause by this situation, after a RefreshEvent, Eureka's fetch heartbeat will no longer trigger listeners refreshing function and my gateway will crash if I have some applications down.

    What I did to fix this problem is to markdown instance of those listeners and then try to re-register them into new DiscoveryClient when refreshing job is done.

    Here is my code:

    @Configuration
    @Slf4j
    public class RibbonDiscoveryClientListenerManager implements SmartApplicationListener {
    
        private static EurekaClient discoveryClient;
        /**
         * markdown alive listeners in current EurekaClient
         */
        private static final CopyOnWriteArraySet<EurekaEventListener> EUREKA_EVENT_LISTENER_SET = new CopyOnWriteArraySet<>();
    
        /**
         * judge whether to try a re-register
         *
         * @param listener
         */
        static void register(EurekaEventListener listener) {
            if (discoveryClient != null) {
                registerEurekaListener(listener);
                log.debug("discoveryClient update succeed");
            } else {
                log.warn("discoveryClient was not found waiting for scheduling...");
            }
        }
    
        public static void registerEurekaListener(EurekaEventListener listener) {
            if (!EUREKA_EVENT_LISTENER_SET.contains(listener)) {
                EUREKA_EVENT_LISTENER_SET.add(listener);
                discoveryClient.registerEventListener(listener);
            }
        }
    
        @Override
        public boolean supportsEventType(Class<? extends ApplicationEvent> eventType) {
            return InstanceRegisteredEvent.class.isAssignableFrom(eventType);
        }
    
        @Override
        public void onApplicationEvent(ApplicationEvent event) {
            //clear cache
            discoveryClient = null;
            EUREKA_EVENT_LISTENER_SET.clear();
    
            //try to get CloudEurekaClient
            for (EurekaClient bean : ApplicationContextHolder.getBeans(EurekaClient.class)) {
                if (CloudEurekaClient.class.isAssignableFrom(bean.getClass())) {
                    discoveryClient = bean;
                }
            }
        }
    }
    

    Here is the customized ServerListUpdate

    @Slf4j
    public class RibbonClientEurekaAutoCompensateServerListUpdater implements ServerListUpdater {
        private static class LazyHolder {
            private final static String CORE_THREAD = "EurekaNotificationServerListUpdater.ThreadPoolSize";
            private final static String QUEUE_SIZE = "EurekaNotificationServerListUpdater.queueSize";
            private final static LazyHolder SINGLETON = new LazyHolder();
    
            private final DynamicIntProperty poolSizeProp = new DynamicIntProperty(CORE_THREAD, 2);
            private final DynamicIntProperty queueSizeProp = new DynamicIntProperty(QUEUE_SIZE, 1000);
            private final ThreadPoolExecutor defaultServerListUpdateExecutor;
            private final Thread shutdownThread;
    
            private LazyHolder() {
                int corePoolSize = getCorePoolSize();
                defaultServerListUpdateExecutor = new ThreadPoolExecutor(
                        corePoolSize,
                        corePoolSize * 5,
                        0,
                        TimeUnit.NANOSECONDS,
                        new ArrayBlockingQueue<Runnable>(queueSizeProp.get()),
                        new ThreadFactoryBuilder()
                                .setNameFormat("EurekaNotificationServerListUpdater-%d")
                                .setDaemon(true)
                                .build()
                );
    
                poolSizeProp.addCallback(new Runnable() {
                    @Override
                    public void run() {
                        int corePoolSize = getCorePoolSize();
                        defaultServerListUpdateExecutor.setCorePoolSize(corePoolSize);
                        defaultServerListUpdateExecutor.setMaximumPoolSize(corePoolSize * 5);
                    }
                });
    
                shutdownThread = new Thread(new Runnable() {
                    @Override
                    public void run() {
                        log.info("Shutting down the Executor for EurekaNotificationServerListUpdater");
                        try {
                            defaultServerListUpdateExecutor.shutdown();
                            Runtime.getRuntime().removeShutdownHook(shutdownThread);
                        } catch (Exception e) {
                            // this can happen in the middle of a real shutdown, and that's ok.
                        }
                    }
                });
    
                Runtime.getRuntime().addShutdownHook(shutdownThread);
            }
    
            private int getCorePoolSize() {
                int propSize = poolSizeProp.get();
                if (propSize > 0) {
                    return propSize;
                }
                return 2; // default
            }
        }
    
        public static ExecutorService getDefaultRefreshExecutor() {
            return LazyHolder.SINGLETON.defaultServerListUpdateExecutor;
        }
    
        /* visible for testing */ final AtomicBoolean updateQueued = new AtomicBoolean(false);
        private final AtomicBoolean isActive = new AtomicBoolean(false);
        private final AtomicLong lastUpdated = new AtomicLong(System.currentTimeMillis());
        private final Provider<EurekaClient> eurekaClientProvider;
        private final ExecutorService refreshExecutor;
    
        private volatile EurekaEventListener updateListener;
        private volatile EurekaClient eurekaClient;
    
        public RibbonClientEurekaAutoCompensateServerListUpdater() {
            this(new LegacyEurekaClientProvider());
        }
    
        public RibbonClientEurekaAutoCompensateServerListUpdater(final Provider<EurekaClient> eurekaClientProvider) {
            this(eurekaClientProvider, getDefaultRefreshExecutor());
        }
    
        public RibbonClientEurekaAutoCompensateServerListUpdater(final Provider<EurekaClient> eurekaClientProvider, ExecutorService refreshExecutor) {
            this.eurekaClientProvider = eurekaClientProvider;
            this.refreshExecutor = refreshExecutor;
        }
    
        @Override
        public synchronized void start(final UpdateAction updateAction) {
            if (isActive.compareAndSet(false, true)) {
                this.updateListener = new EurekaEventListener() {
                    @Override
                    public void onEvent(EurekaEvent event) {
                        if (event instanceof CacheRefreshedEvent) {
                            if (!updateQueued.compareAndSet(false, true)) {  // if an update is already queued
                                log.info("an update action is already queued, returning as no-op");
                                return;
                            }
    
                            if (!refreshExecutor.isShutdown()) {
                                try {
                                    refreshExecutor.submit(new Runnable() {
                                        @Override
                                        public void run() {
                                            try {
                                                updateAction.doUpdate();
                                                lastUpdated.set(System.currentTimeMillis());
                                            } catch (Exception e) {
                                                log.warn("Failed to update serverList", e);
                                            } finally {
                                                updateQueued.set(false);
                                            }
                                        }
                                    });  // fire and forget
                                } catch (Exception e) {
                                    log.warn("Error submitting update task to executor, skipping one round of updates", e);
                                    updateQueued.set(false);  // if submit fails, need to reset updateQueued to false
                                }
                            } else {
                                log.debug("stopping EurekaNotificationServerListUpdater, as refreshExecutor has been shut down");
                                stop();
                            }
                        }
                    }
                };
                if (eurekaClient == null) {
                    eurekaClient = eurekaClientProvider.get();
                }
                if (eurekaClient != null) {
                    RibbonDiscoveryClientListenerManager.register(updateListener);
                } else {
                    log.error("Failed to register an updateListener to eureka client, eureka client is null");
                    throw new IllegalStateException("Failed to start the updater, unable to register the update listener due to eureka client being null.");
                }
    
                //start a shcedulepool to check new DiscoveryClient's listeners
                new ScheduledThreadPoolExecutor(1,
                        new ThreadFactoryBuilder()
                                .setNameFormat("refreshListenerPool-%d")
                                .build())
                        .scheduleWithFixedDelay(() -> {
                            //schedule to invoke register defined in RibbonDiscoveryClientListenerManager
                            RibbonDiscoveryClientListenerManager.register(updateListener);
                        }, 10, 10, TimeUnit.SECONDS);
            } else {
                log.info("Update listener already registered, no-op");
            }
        }
    
        @Override
        public synchronized void stop() {
            if (isActive.compareAndSet(true, false)) {
                if (eurekaClient != null) {
                    eurekaClient.unregisterEventListener(updateListener);
                }
            } else {
                log.info("Not currently active, no-op");
            }
        }
    
        @Override
        public String getLastUpdate() {
            return new Date(lastUpdated.get()).toString();
        }
    
        @Override
        public long getDurationSinceLastUpdateMs() {
            return System.currentTimeMillis() - lastUpdated.get();
        }
    
        @Override
        public int getNumberMissedCycles() {
            return 0;
        }
    
        @Override
        public int getCoreThreads() {
            if (isActive.get()) {
                if (refreshExecutor != null && refreshExecutor instanceof ThreadPoolExecutor) {
                    return ((ThreadPoolExecutor) refreshExecutor).getCorePoolSize();
                }
            }
            return 0;
        }
    }
    
    

    Config Class:

    
    @RibbonClients(defaultConfiguration = CustomizedRibbonConfig.class)
    public class RibbonClientConfiguration {
        public static class BazServiceList extends ConfigurationBasedServerList {
            public BazServiceList(IClientConfig config) {
                super.initWithNiwsConfig(config);
            }
        }
    }
    
    @Configuration
    class CustomizedRibbonConfig {
        static final AtomicBoolean justRefreshed = new AtomicBoolean(false);
    
        @Bean
        public IRule ribbonRule() {
            return new MetadataAwareRule();
        }
    
        @Bean
        public ServerListUpdater ribbonServerListUpdater() {
            return new RibbonClientEurekaAutoCompensateServerListUpdater();
        }
    }