spring-messagingrsocketrsocket-java

Correct usage of LoadbalanceRSocketClient with Spring's RSocketRequester


I'm trying to understand the correct configuration and usage pattern of LoadbalanceRSocketClient in a context of SpringBoot application (RSocketRequester).

I have two RSocket server backends (SpringBoot, RSocket messaging) running and configuring the RSocketRequester on a client side like this:

List<LoadbalanceTarget> servers = new ArrayList<>();
for (String url: backendUrls) {
  HttpClient httpClient = HttpClient.create()
    .baseUrl(url)
    .secure(ssl -> 
       ssl.sslContext(SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE)));
  servers.add(LoadbalanceTarget.from(url, WebsocketClientTransport.create(httpClient, url)));
}

// RSocketRequester.Builder is autowired by Spring boot
RSocketRequester requester = builder
  .setupRoute("/connect")
  .setupData("test")
  //.rsocketConnector(connector -> connector.reconnect(Retry.fixedDelay(60, Duration.ofSeconds(1))))
 .transports(Flux.just(servers), new RoundRobinLoadbalanceStrategy());   

Once configured, the requester is being used repeatedly form the timer loop, as following:

@Scheduled(fixedDelay = 10000, initialDelay = 1000)
public void timer() {
  requester.route("/foo").data(Data).send().block();
}

It works - client starts, connects to one of the servers and pushes messages to it. If I kill the server that clients connected to, client reconnects to another server on the next timer event. If I start first server again and kill a second one though, client doesn't connect anymore and the following exeption is observed on a client side:

java.util.concurrent.CancellationException: Pool is exhausted
    at io.rsocket.loadbalance.RSocketPool.select(RSocketPool.java:202) ~[rsocket-core-1.1.0.jar:na]
    at io.rsocket.loadbalance.LoadbalanceRSocketClient.lambda$fireAndForget$0(LoadbalanceRSocketClient.java:49) ~[rsocket-core-1.1.0.jar:na]
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:125) ~[reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107) ~[reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107) ~[reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:220) ~[reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1784) ~[reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.MonoZip$ZipCoordinator.signal(MonoZip.java:251) ~[reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.MonoZip$ZipInner.onNext(MonoZip.java:336) ~[reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1784) ~[reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.MonoCallable.subscribe(MonoCallable.java:61) ~[reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.Mono.subscribe(Mono.java:3987) ~[reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.MonoZip.subscribe(MonoZip.java:128) ~[reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.Mono.subscribe(Mono.java:3987) ~[reactor-core-3.4.0.jar:3.4.0]
    at reactor.core.publisher.Mono.block(Mono.java:1678) ~[reactor-core-3.4.0.jar:3.4.0]

I suspect that I'm either not configuring the requester correctly or not using it properly. Would appreciate any hints as documentation and tests are seems to be pretty thin in this area.

Ideally I would want a client to transparently switch to any next available server upon server/connectivity failure. Right now re-connection attempt seems to be happening only on the next call to timer() method, which is not ideal as client needs to handle incoming messages from the server. Another thing I observed is that even so "/foo" is a FnF route, unless I do block() after a send() server never receives the call.


Solution

  • Update Endpoints List Continuously

    LoadbalanceClient is designed to be integrated with the Discovery service which is responsible for keeping a List of alive Instances. That said if one of the services disappears from the cluster, the Discovery service updates its List of available Instances.

    On the other hand, to implement client-side loadblancing, we have to know the list of available services in the cluster. It is obvious, that to setup loadbalancing, we can retrieve the list of services and supply it to the Loadbalancer API.

    ReactiveDiscoveryClient discoveryClient = ...
    
    Mono<List<LoadbalanceTarget>> serversMono = discoveryClient
        .getInstances(serviceGroupName)
        .map(si -> {
            HttpClient httpClient = HttpClient.create()
              .baseUrl(si.getUri())
              .secure(ssl -> ssl.sslContext(
                  SslContextBuilder.forClient()
                             .trustManager(InsecureTrustManagerFactory.INSTANCE)
              ));
            return LoadbalanceTarget.from(si.getUri(), WebsocketClientTransport.create(httpClient, "/rsocket")));
        })
        .collectList()
    
    // RSocketRequester.Builder is autowired by Spring boot
    RSocketRequester requester = builder
      .setupRoute("/connect")
      .setupData("test")
      .transports(serversMono.flux(), new RoundRobinLoadbalanceStrategy());   
    
    

    However, imagine that we are in a fully distributed environment, and now every service that disappears and appears again - runs on the absolutely new host and port (e.g. kubernates cluster which does not stick to a particular IP address). That said, Loadbalancing has to consider such a scenario and to avoid dead nodes in the pool, it removes unhealthy nodes from the pool completely.

    Now, if all the nodes disappeared and appeared after some time, they are not included in the pool anymore (and if the Flux, which provides updates is completed, effectively, the pool is exhausted because no new update will come in from the Flux<List<LodbalanceTarget>>).

    However, the nodes register themselves into the Discovery service and become available for observation. All that said we have to periodically pull info from the Discovery service to be up to date and update pool state continuously

    ReactiveDiscoveryClient discoveryClient = ...
    
    Flux<List<LoadbalanceTarget>> serversFlux = discoveryClient
        .getInstances(serviceGroupName)
        .map(si -> {
            HttpClient httpClient = HttpClient.create()
              .baseUrl(si.getUri())
              .secure(ssl -> ssl.sslContext(
                  SslContextBuilder.forClient()
                             .trustManager(InsecureTrustManagerFactory.INSTANCE)
              ));
            return LoadbalanceTarget.from(si.getUri(), WebsocketClientTransport.create(httpClient, "/rsocket")));
        })
        .collectList()
        .repeatWhen(f -> f.delayElements(Duration.ofSeconds(1))) // <- continuously retrieve new List of ServiceInstances
    
    // RSocketRequester.Builder is autowired by Spring boot
    RSocketRequester requester = builder
      .setupRoute("/connect")
      .setupData("test")
      .transports(servers, new RoundRobinLoadbalanceStrategy());
    

    With such a setup, the RSocketPool will not be exhausted if all the nodes disappear from the cluster, because the Flux<List<LoadbalanceTraget>> has not completed yet and may provide new updates eventually.

    Note, the implementation is smart enough to keep active nodes on every update from the discovery service. That said if there is such a service instance in the pool, you will not get 2 connections at the same time.

    Side note on reconnect feature

    You may notice, that RSocketConnector provides such a great feature called .reconnect. At first glance, it may seem that the usage of reconnect will keep your connection up and running infinitely. Unfortunately, that is not true. The .reconnect feature is designed to keep your Mono<RSocket> reusable with cache semantic, which means that you may create a @Bean Mono<RSocket> ... and autowire it in a various place and subscribe multiple times without worrying that the result RSocket instance will be different on every Mono<RSocket>.subscribe. On the other hand, .reconnect, if given RSocket becomes disconnected (e.g. lost connection case) the next subscription to such a Mono<RSocket> will resistible a new RSocket only once for all concurrent .subscribe calls.

    Though it sounds useful feature, in RSocketPool we do not rely on it much and use Mono<RSocket> only once to resolve and cache an instance of RSocket inside RSocketPool. That said if such RSocket will be disconnected, we will not be trying to subscribe to the given Mono<RSocket> again (we assume, that set up host and port will be changed)