javaspring-bootcassandradatastax-java-driverspring-data-cassandra

Unable to connect to the remote Cassandra datacenter with the options provided in error


I'm trying to connect to two of my remote contact points using the Cassandra CqlSession class. I'm using spring boot 3.3.3 and spring-data-cassandra 4.3.3. Below are the datastax driver dependencies used:

<dependency>
   <groupId>org.apache.cassandra</groupId>
   <artifactId>java-driver-core</artifactId>
   <version>4.18.1</version>
</dependency>
<dependency>
   <groupId>org.apache.cassandra</groupId>
   <artifactId>java-driver-query-builder</artifactId>
   <version>4.18.1</version>
</dependency>
<dependency>
   <groupId>org.apache.cassandra</groupId>
   <artifactId>java-driver-mapper-runtime</artifactId>
   <version>4.18.1</version>
</dependency>
<dependency>
   <groupId>org.apache.cassandra</groupId>
   <artifactId>java-driver-mapper-processor</artifactId>
   <version>4.18.1</version>
</dependency>

Below is the Bean configuration I'm using with SSL certificates for connecting to the Cassandra cluster:

@Configuration
public class CassandraClientConfig extends AbstractCassandraConfiguration {
    @Bean("cluster")
    @Primary
    public CqlSession sslSession() throws Exception {

        // Load the certificate and private key into a KeyStore
        KeyStore keyStore = KeyStore.getInstance("PKCS12");
        keyStore.load(null, null);
        SSLContext sslContext;

        try {
            Path tmCertPath = Paths.get(cassandraClient.getAppCertPath());
            Path tmPrivateKeyPath = Paths.get(cassandraClient.getAppKeyPath());
            X509Certificate certificate = kmiDefinitions.loadCertificate(tmCertPath);
            PrivateKey privateKey = kmiDefinitions.loadPrivateKey(tmPrivateKeyPath);

            KeyStore.PrivateKeyEntry keyEntry = new KeyStore.PrivateKeyEntry(privateKey, new Certificate[]{certificate});
            keyStore.setEntry("test_management_secrets", keyEntry, new KeyStore.PasswordProtection("".toCharArray()));

            KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
            keyManagerFactory.init(keyStore, "".toCharArray());

            sslContext = SSLContext.getInstance("TLS");
            sslContext.init(keyManagerFactory.getKeyManagers(), null, null);
        } catch (GeneralSecurityException | IOException e) {
            logger.error("Could not create mutual SSL Context!", e);
            throw e;
        }

        return CqlSession.builder()
                .addContactPoint(new InetSocketAddress(cassandraClient.getPrimaryHost(), 9042))
                .addContactPoint(new InetSocketAddress(cassandraClient.getFallbackHost(), 9042))
                .withLocalDatacenter("SEA")
                .withSslContext(sslContext)
                .withKeyspace(cassandraClient.getKeySpace())
                .addRequestTracker(new MultiplexingRequestTracker())
                .build();

    }
}

Below is the custom YAML config I'm using for cassandra hostnames and SSL secrets:

application:
  cassandra-client:
    app-cert-path: some_relative_path.certificate
    app-key-path: some_relative_path.private_key
    key-space: akatest_sqa2
    primary-host: api-beta.caas.dbattery.com
    fallback-host: fallback-beta.caas.dbattery.com

Below is the error I'm facing currently and this is not my local environment, I'm using staging env.

Caused by: com.datastax.oss.driver.api.core.AllNodesFailedException: Could not reach any contact point, make sure you've provided valid addresses (showing first 1 nodes, use getAllErrors() for more): Node(endPoint=localhost/<unresolved>:9042, hostId=null, hashCode=18682432): [com.datastax.oss.driver.api.core.connection.ConnectionInitException: [s1|control|connecting...] Protocol initialization request, step 1 (OPTIONS): failed to send request (io.netty.channel.StacklessClosedChannelException)]
    at com.datastax.oss.driver.api.core.AllNodesFailedException.copy(AllNodesFailedException.java:143)
    at com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures.getUninterruptibly(CompletableFutures.java:151)
    at com.datastax.oss.driver.api.core.session.SessionBuilder.build(SessionBuilder.java:837)
    at org.springframework.data.cassandra.config.CqlSessionFactoryBean.buildSystemSession(CqlSessionFactoryBean.java:535)
    at org.springframework.data.cassandra.config.CqlSessionFactoryBean.afterPropertiesSet(CqlSessionFactoryBean.java:469)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1853)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1802)
    ... 24 more
    Suppressed: com.datastax.oss.driver.api.core.connection.ConnectionInitException: [s1|control|connecting...] Protocol initialization request, step 1 (OPTIONS): failed to send request (io.netty.channel.StacklessClosedChannelException)
        at com.datastax.oss.driver.internal.core.channel.ProtocolInitHandler$InitRequest.fail(ProtocolInitHandler.java:358)
        at com.datastax.oss.driver.internal.core.channel.ChannelHandlerRequest.writeListener(ChannelHandlerRequest.java:89)
        at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:590)
        at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:557)
        at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:492)
        at io.netty.util.concurrent.DefaultPromise.addListener(DefaultPromise.java:185)
        at io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:95)
        at io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:30)
        at com.datastax.oss.driver.internal.core.channel.ChannelHandlerRequest.send(ChannelHandlerRequest.java:78)
        at com.datastax.oss.driver.internal.core.channel.ProtocolInitHandler$InitRequest.send(ProtocolInitHandler.java:195)
        at com.datastax.oss.driver.internal.core.channel.ProtocolInitHandler.onRealConnect(ProtocolInitHandler.java:126)
        at com.datastax.oss.driver.internal.core.channel.ConnectInitHandler.lambda$connect$0(ConnectInitHandler.java:59)
        at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:590)
        at io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:583)
        at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:559)
        at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:492)
        at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:636)
        at io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:629)
        at io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:118)
        at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:326)
        at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:342)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:776)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.base/java.lang.Thread.run(Thread.java:840)
        Suppressed: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: localhost/127.0.0.1:9042
        Caused by: java.net.ConnectException: Connection refused
            at java.base/sun.nio.ch.Net.pollConnect(Native Method)
            at java.base/sun.nio.ch.Net.pollConnectNow(Net.java:672)
            at java.base/sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:946)
            at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:337)
            at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:339)
            at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:776)
            at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
            at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
            at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
            at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
            at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
            at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
            at java.base/java.lang.Thread.run(Thread.java:840)
    Caused by: io.netty.channel.StacklessClosedChannelException
        at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0()(Unknown Source)

Initially when I had not explicitly defined the local datacenter and I have encountered the below error:

Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'cluster' defined in class path resource [com/akamai/commons/configs/CassandraClientConfig.class]: Failed to instantiate [com.datastax.oss.driver.api.core.CqlSession]: Factory method 'sslSession' threw exception with message: Since you provided explicit contact points, the local DC must be explicitly set (see basic.load-balancing-policy.local-datacenter in the config, or set it programmatically with SessionBuilder.withLocalDatacenter). Current contact points are: Node(endPoint=api-beta.caas.dbattery.com/23.99.129.142:9042, hostId=ddcf16d5-3ca9-46ad-bce1-935c675245a0, hashCode=2c701135)=SEA, Node(endPoint=fallback-beta.caas.dbattery.com/23.99.129.141:9042, hostId=cbe38c1d-35ab-4f1c-9f4f-002e3ae1dd67, hashCode=1db3dd9b)=SEA. Current DCs in this cluster are: EWR, SEA
    at org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:648)
    at org.springframework.beans.factory.support.ConstructorResolver.instantiateUsingFactoryMethod(ConstructorResolver.java:485)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.instantiateUsingFactoryMethod(AbstractAutowireCapableBeanFactory.java:1355)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBeanInstance(AbstractAutowireCapableBeanFactory.java:1185)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:562)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:522)
    at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:337)
    at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:234)
    at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:335)
    at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:200)
    at org.springframework.beans.factory.config.DependencyDescriptor.resolveCandidate(DependencyDescriptor.java:254)
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.doResolveDependency(DefaultListableBeanFactory.java:1443)
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.resolveDependency(DefaultListableBeanFactory.java:1353)
    at org.springframework.beans.factory.support.ConstructorResolver.resolveAutowiredArgument(ConstructorResolver.java:904)
    at org.springframework.beans.factory.support.ConstructorResolver.createArgumentArray(ConstructorResolver.java:782)
    ... 79 more
Caused by: org.springframework.beans.BeanInstantiationException: Failed to instantiate [com.datastax.oss.driver.api.core.CqlSession]: Factory method 'sslSession' threw exception with message: Since you provided explicit contact points, the local DC must be explicitly set (see basic.load-balancing-policy.local-datacenter in the config, or set it programmatically with SessionBuilder.withLocalDatacenter). Current contact points are: Node(endPoint=api-beta.caas.dbattery.akamai.com/23.54.119.142:9042, hostId=ddcf16d5-3ca9-46ad-bce1-935c675245a0, hashCode=2c701135)=SEA, Node(endPoint=fallback-beta.caas.dbattery.akamai.com/23.54.119.141:9042, hostId=cbe38c1d-35ab-4f1c-9f4f-002e3ae1dd67, hashCode=1db3dd9b)=SEA. Current DCs in this cluster are: EWR, SEA
    at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:178)
    at org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:644)
    ... 93 more
Caused by: java.lang.IllegalStateException: Since you provided explicit contact points, the local DC must be explicitly set (see basic.load-balancing-policy.local-datacenter in the config, or set it programmatically with SessionBuilder.withLocalDatacenter). Current contact points are: Node(endPoint=api-beta.caas.dbattery.akamai.com/23.54.119.142:9042, hostId=ddcf16d5-3ca9-46ad-bce1-935c675245a0, hashCode=2c701135)=SEA, Node(endPoint=fallback-beta.caas.dbattery.akamai.com/23.54.119.141:9042, hostId=cbe38c1d-35ab-4f1c-9f4f-002e3ae1dd67, hashCode=1db3dd9b)=SEA. Current DCs in this cluster are: EWR, SEA
    at com.datastax.oss.driver.internal.core.loadbalancing.helper.MandatoryLocalDcHelper.discoverLocalDc(MandatoryLocalDcHelper.java:93)
    at com.datastax.oss.driver.internal.core.loadbalancing.DefaultLoadBalancingPolicy.discoverLocalDc(DefaultLoadBalancingPolicy.java:122)
    at com.datastax.oss.driver.internal.core.loadbalancing.BasicLoadBalancingPolicy.init(BasicLoadBalancingPolicy.java:170)
    at com.datastax.oss.driver.internal.core.metadata.LoadBalancingPolicyWrapper.init(LoadBalancingPolicyWrapper.java:123)
    at com.datastax.oss.driver.internal.core.session.DefaultSession$SingleThreaded.initializePools(DefaultSession.java:475)
    at com.datastax.oss.driver.internal.core.session.DefaultSession$SingleThreaded.lambda$init$5(DefaultSession.java:393)
    at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1150)
    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
    at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)
    at com.datastax.oss.driver.internal.core.metadata.MetadataManager$SingleThreaded.lambda$startSchemaRequest$1(MetadataManager.java:450)
    at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
    at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
    at java.base/java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:614)
    at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:653)
    at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:482)
    at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173)
    at io.netty.channel.DefaultEventLoop.run(DefaultEventLoop.java:54)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:840)

NOW, I'm not sure what exactly is the issue, when I specify the exact data center as specified in the error, it connects to localhost rather and we do NOT have a local cassandra running in the staging env. I'm new to using Cassandra, please help me resolve this issue. Also, let me know if any additional info is needed.

I have tried setting datacenter values for each contact point as below:

CqlSession.builder()
      .addContactPoint(new InetSocketAddress(cassandraClient.getPrimaryHost(), 9042))
      .withLocalDatacenter("EWR")
      .addContactPoint(new InetSocketAddress(cassandraClient.getFallbackHost(), 9042))
      .withLocalDatacenter("SEA")
      .withSslContext(sslContext)
      .withKeyspace(cassandraClient.getKeySpace())
      .addRequestTracker(new MultiplexingRequestTracker())
      .build();

With above config, I was getting below error:

Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'cluster' defined in class path resource [com/akamai/commons/configs/CassandraClientConfig.class]: Failed to instantiate [com.datastax.oss.driver.api.core.CqlSession]: Factory method 'sslSession' threw exception with message: Multiple entries with same key: default=EWR and default=SEA
    at org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:648)
    at org.springframework.beans.factory.support.ConstructorResolver.instantiateUsingFactoryMethod(ConstructorResolver.java:485)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.instantiateUsingFactoryMethod(AbstractAutowireCapableBeanFactory.java:1355)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBeanInstance(AbstractAutowireCapableBeanFactory.java:1185)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:562)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:522)
    at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:337)
    at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:234)
    at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:335)
    at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:200)
    at org.springframework.beans.factory.config.DependencyDescriptor.resolveCandidate(DependencyDescriptor.java:254)
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.doResolveDependency(DefaultListableBeanFactory.java:1443)
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.resolveDependency(DefaultListableBeanFactory.java:1353)
    at org.springframework.beans.factory.support.ConstructorResolver.resolveAutowiredArgument(ConstructorResolver.java:904)
    at org.springframework.beans.factory.support.ConstructorResolver.createArgumentArray(ConstructorResolver.java:782)

Since, I didn't have any local cassandra in the staging, I also tried using an alternative Loadbalancing policy as specified here but even with this, I'm getting the below exception where it tries to connect to localhost:

Caused by: com.datastax.oss.driver.api.core.AllNodesFailedException: Could not reach any contact point, make sure you've provided valid addresses (showing first 1 nodes, use getAllErrors() for more): Node(endPoint=localhost/<unresolved>:9042, hostId=null, hashCode=5ad2d127): [com.datastax.oss.driver.api.core.connection.ConnectionInitException: [s1|control|connecting...] Protocol initialization request, step 1 (OPTIONS): failed to send request (io.netty.channel.StacklessClosedChannelException)]
    at com.datastax.oss.driver.api.core.AllNodesFailedException.copy(AllNodesFailedException.java:143)
    at com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures.getUninterruptibly(CompletableFutures.java:151)
    at com.datastax.oss.driver.api.core.session.SessionBuilder.build(SessionBuilder.java:837)
    at org.springframework.data.cassandra.config.CqlSessionFactoryBean.buildSystemSession(CqlSessionFactoryBean.java:535)
    at org.springframework.data.cassandra.config.CqlSessionFactoryBean.afterPropertiesSet(CqlSessionFactoryBean.java:469)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1853)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1802)

Thanks in advance for any help.


Solution

  • Figured out that extending the class AbstractCassandraConfiguration was causing this issue. Once I removed that from the Configuration class, it started contacting the endpoints which are explicitly configured.

    The class AbstractCassandraConfiguration in turn extends another abstract class - AbstractSessionConfiguration. The class AbstractSessionConfiguration has a bean configured with the name cassandraSession which has basic configurations and connects to localhost as a basic contact-point as shown below:

    
        @Bean
        public CqlSessionFactoryBean cassandraSession() {
    
            CqlSessionFactoryBean bean = new CqlSessionFactoryBean();
    
            bean.setContactPoints(getContactPoints());
            bean.setKeyspaceCreations(getKeyspaceCreations());
            bean.setKeyspaceDrops(getKeyspaceDrops());
            bean.setKeyspaceName(getKeyspaceName());
            bean.setKeyspaceStartupScripts(getStartupScripts());
            bean.setKeyspaceShutdownScripts(getShutdownScripts());
            bean.setLocalDatacenter(getLocalDataCenter());
            bean.setPort(getPort());
            bean.setSessionBuilderConfigurer(getSessionBuilderConfigurerWrapper());
    
            return bean;
        }
    
        protected String getContactPoints() {
            return CqlSessionFactoryBean.DEFAULT_CONTACT_POINTS;
        }
        
    
    

    This value CqlSessionFactoryBean.DEFAULT_CONTACT_POINTS is initialized as a static constant

    public static final String DEFAULT_CONTACT_POINTS = "localhost";