I'm trying to connect to two of my remote contact points using the Cassandra CqlSession class. I'm using spring boot 3.3.3 and spring-data-cassandra 4.3.3. Below are the datastax driver dependencies used:
<dependency>
<groupId>org.apache.cassandra</groupId>
<artifactId>java-driver-core</artifactId>
<version>4.18.1</version>
</dependency>
<dependency>
<groupId>org.apache.cassandra</groupId>
<artifactId>java-driver-query-builder</artifactId>
<version>4.18.1</version>
</dependency>
<dependency>
<groupId>org.apache.cassandra</groupId>
<artifactId>java-driver-mapper-runtime</artifactId>
<version>4.18.1</version>
</dependency>
<dependency>
<groupId>org.apache.cassandra</groupId>
<artifactId>java-driver-mapper-processor</artifactId>
<version>4.18.1</version>
</dependency>
Below is the Bean configuration I'm using with SSL certificates for connecting to the Cassandra cluster:
@Configuration
public class CassandraClientConfig extends AbstractCassandraConfiguration {
@Bean("cluster")
@Primary
public CqlSession sslSession() throws Exception {
// Load the certificate and private key into a KeyStore
KeyStore keyStore = KeyStore.getInstance("PKCS12");
keyStore.load(null, null);
SSLContext sslContext;
try {
Path tmCertPath = Paths.get(cassandraClient.getAppCertPath());
Path tmPrivateKeyPath = Paths.get(cassandraClient.getAppKeyPath());
X509Certificate certificate = kmiDefinitions.loadCertificate(tmCertPath);
PrivateKey privateKey = kmiDefinitions.loadPrivateKey(tmPrivateKeyPath);
KeyStore.PrivateKeyEntry keyEntry = new KeyStore.PrivateKeyEntry(privateKey, new Certificate[]{certificate});
keyStore.setEntry("test_management_secrets", keyEntry, new KeyStore.PasswordProtection("".toCharArray()));
KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
keyManagerFactory.init(keyStore, "".toCharArray());
sslContext = SSLContext.getInstance("TLS");
sslContext.init(keyManagerFactory.getKeyManagers(), null, null);
} catch (GeneralSecurityException | IOException e) {
logger.error("Could not create mutual SSL Context!", e);
throw e;
}
return CqlSession.builder()
.addContactPoint(new InetSocketAddress(cassandraClient.getPrimaryHost(), 9042))
.addContactPoint(new InetSocketAddress(cassandraClient.getFallbackHost(), 9042))
.withLocalDatacenter("SEA")
.withSslContext(sslContext)
.withKeyspace(cassandraClient.getKeySpace())
.addRequestTracker(new MultiplexingRequestTracker())
.build();
}
}
Below is the custom YAML config I'm using for cassandra hostnames and SSL secrets:
application:
cassandra-client:
app-cert-path: some_relative_path.certificate
app-key-path: some_relative_path.private_key
key-space: akatest_sqa2
primary-host: api-beta.caas.dbattery.com
fallback-host: fallback-beta.caas.dbattery.com
Below is the error I'm facing currently and this is not my local environment, I'm using staging env.
Caused by: com.datastax.oss.driver.api.core.AllNodesFailedException: Could not reach any contact point, make sure you've provided valid addresses (showing first 1 nodes, use getAllErrors() for more): Node(endPoint=localhost/<unresolved>:9042, hostId=null, hashCode=18682432): [com.datastax.oss.driver.api.core.connection.ConnectionInitException: [s1|control|connecting...] Protocol initialization request, step 1 (OPTIONS): failed to send request (io.netty.channel.StacklessClosedChannelException)]
at com.datastax.oss.driver.api.core.AllNodesFailedException.copy(AllNodesFailedException.java:143)
at com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures.getUninterruptibly(CompletableFutures.java:151)
at com.datastax.oss.driver.api.core.session.SessionBuilder.build(SessionBuilder.java:837)
at org.springframework.data.cassandra.config.CqlSessionFactoryBean.buildSystemSession(CqlSessionFactoryBean.java:535)
at org.springframework.data.cassandra.config.CqlSessionFactoryBean.afterPropertiesSet(CqlSessionFactoryBean.java:469)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1853)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1802)
... 24 more
Suppressed: com.datastax.oss.driver.api.core.connection.ConnectionInitException: [s1|control|connecting...] Protocol initialization request, step 1 (OPTIONS): failed to send request (io.netty.channel.StacklessClosedChannelException)
at com.datastax.oss.driver.internal.core.channel.ProtocolInitHandler$InitRequest.fail(ProtocolInitHandler.java:358)
at com.datastax.oss.driver.internal.core.channel.ChannelHandlerRequest.writeListener(ChannelHandlerRequest.java:89)
at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:590)
at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:557)
at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:492)
at io.netty.util.concurrent.DefaultPromise.addListener(DefaultPromise.java:185)
at io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:95)
at io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:30)
at com.datastax.oss.driver.internal.core.channel.ChannelHandlerRequest.send(ChannelHandlerRequest.java:78)
at com.datastax.oss.driver.internal.core.channel.ProtocolInitHandler$InitRequest.send(ProtocolInitHandler.java:195)
at com.datastax.oss.driver.internal.core.channel.ProtocolInitHandler.onRealConnect(ProtocolInitHandler.java:126)
at com.datastax.oss.driver.internal.core.channel.ConnectInitHandler.lambda$connect$0(ConnectInitHandler.java:59)
at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:590)
at io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:583)
at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:559)
at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:492)
at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:636)
at io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:629)
at io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:118)
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:326)
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:342)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:776)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:840)
Suppressed: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: localhost/127.0.0.1:9042
Caused by: java.net.ConnectException: Connection refused
at java.base/sun.nio.ch.Net.pollConnect(Native Method)
at java.base/sun.nio.ch.Net.pollConnectNow(Net.java:672)
at java.base/sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:946)
at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:337)
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:339)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:776)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: io.netty.channel.StacklessClosedChannelException
at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0()(Unknown Source)
Initially when I had not explicitly defined the local datacenter and I have encountered the below error:
Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'cluster' defined in class path resource [com/akamai/commons/configs/CassandraClientConfig.class]: Failed to instantiate [com.datastax.oss.driver.api.core.CqlSession]: Factory method 'sslSession' threw exception with message: Since you provided explicit contact points, the local DC must be explicitly set (see basic.load-balancing-policy.local-datacenter in the config, or set it programmatically with SessionBuilder.withLocalDatacenter). Current contact points are: Node(endPoint=api-beta.caas.dbattery.com/23.99.129.142:9042, hostId=ddcf16d5-3ca9-46ad-bce1-935c675245a0, hashCode=2c701135)=SEA, Node(endPoint=fallback-beta.caas.dbattery.com/23.99.129.141:9042, hostId=cbe38c1d-35ab-4f1c-9f4f-002e3ae1dd67, hashCode=1db3dd9b)=SEA. Current DCs in this cluster are: EWR, SEA
at org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:648)
at org.springframework.beans.factory.support.ConstructorResolver.instantiateUsingFactoryMethod(ConstructorResolver.java:485)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.instantiateUsingFactoryMethod(AbstractAutowireCapableBeanFactory.java:1355)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBeanInstance(AbstractAutowireCapableBeanFactory.java:1185)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:562)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:522)
at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:337)
at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:234)
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:335)
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:200)
at org.springframework.beans.factory.config.DependencyDescriptor.resolveCandidate(DependencyDescriptor.java:254)
at org.springframework.beans.factory.support.DefaultListableBeanFactory.doResolveDependency(DefaultListableBeanFactory.java:1443)
at org.springframework.beans.factory.support.DefaultListableBeanFactory.resolveDependency(DefaultListableBeanFactory.java:1353)
at org.springframework.beans.factory.support.ConstructorResolver.resolveAutowiredArgument(ConstructorResolver.java:904)
at org.springframework.beans.factory.support.ConstructorResolver.createArgumentArray(ConstructorResolver.java:782)
... 79 more
Caused by: org.springframework.beans.BeanInstantiationException: Failed to instantiate [com.datastax.oss.driver.api.core.CqlSession]: Factory method 'sslSession' threw exception with message: Since you provided explicit contact points, the local DC must be explicitly set (see basic.load-balancing-policy.local-datacenter in the config, or set it programmatically with SessionBuilder.withLocalDatacenter). Current contact points are: Node(endPoint=api-beta.caas.dbattery.akamai.com/23.54.119.142:9042, hostId=ddcf16d5-3ca9-46ad-bce1-935c675245a0, hashCode=2c701135)=SEA, Node(endPoint=fallback-beta.caas.dbattery.akamai.com/23.54.119.141:9042, hostId=cbe38c1d-35ab-4f1c-9f4f-002e3ae1dd67, hashCode=1db3dd9b)=SEA. Current DCs in this cluster are: EWR, SEA
at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:178)
at org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:644)
... 93 more
Caused by: java.lang.IllegalStateException: Since you provided explicit contact points, the local DC must be explicitly set (see basic.load-balancing-policy.local-datacenter in the config, or set it programmatically with SessionBuilder.withLocalDatacenter). Current contact points are: Node(endPoint=api-beta.caas.dbattery.akamai.com/23.54.119.142:9042, hostId=ddcf16d5-3ca9-46ad-bce1-935c675245a0, hashCode=2c701135)=SEA, Node(endPoint=fallback-beta.caas.dbattery.akamai.com/23.54.119.141:9042, hostId=cbe38c1d-35ab-4f1c-9f4f-002e3ae1dd67, hashCode=1db3dd9b)=SEA. Current DCs in this cluster are: EWR, SEA
at com.datastax.oss.driver.internal.core.loadbalancing.helper.MandatoryLocalDcHelper.discoverLocalDc(MandatoryLocalDcHelper.java:93)
at com.datastax.oss.driver.internal.core.loadbalancing.DefaultLoadBalancingPolicy.discoverLocalDc(DefaultLoadBalancingPolicy.java:122)
at com.datastax.oss.driver.internal.core.loadbalancing.BasicLoadBalancingPolicy.init(BasicLoadBalancingPolicy.java:170)
at com.datastax.oss.driver.internal.core.metadata.LoadBalancingPolicyWrapper.init(LoadBalancingPolicyWrapper.java:123)
at com.datastax.oss.driver.internal.core.session.DefaultSession$SingleThreaded.initializePools(DefaultSession.java:475)
at com.datastax.oss.driver.internal.core.session.DefaultSession$SingleThreaded.lambda$init$5(DefaultSession.java:393)
at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1150)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)
at com.datastax.oss.driver.internal.core.metadata.MetadataManager$SingleThreaded.lambda$startSchemaRequest$1(MetadataManager.java:450)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
at java.base/java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:614)
at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:653)
at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:482)
at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173)
at io.netty.channel.DefaultEventLoop.run(DefaultEventLoop.java:54)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:840)
NOW, I'm not sure what exactly is the issue, when I specify the exact data center as specified in the error, it connects to localhost rather and we do NOT have a local cassandra running in the staging env. I'm new to using Cassandra, please help me resolve this issue. Also, let me know if any additional info is needed.
I have tried setting datacenter values for each contact point as below:
CqlSession.builder()
.addContactPoint(new InetSocketAddress(cassandraClient.getPrimaryHost(), 9042))
.withLocalDatacenter("EWR")
.addContactPoint(new InetSocketAddress(cassandraClient.getFallbackHost(), 9042))
.withLocalDatacenter("SEA")
.withSslContext(sslContext)
.withKeyspace(cassandraClient.getKeySpace())
.addRequestTracker(new MultiplexingRequestTracker())
.build();
With above config, I was getting below error:
Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'cluster' defined in class path resource [com/akamai/commons/configs/CassandraClientConfig.class]: Failed to instantiate [com.datastax.oss.driver.api.core.CqlSession]: Factory method 'sslSession' threw exception with message: Multiple entries with same key: default=EWR and default=SEA
at org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:648)
at org.springframework.beans.factory.support.ConstructorResolver.instantiateUsingFactoryMethod(ConstructorResolver.java:485)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.instantiateUsingFactoryMethod(AbstractAutowireCapableBeanFactory.java:1355)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBeanInstance(AbstractAutowireCapableBeanFactory.java:1185)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:562)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:522)
at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:337)
at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:234)
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:335)
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:200)
at org.springframework.beans.factory.config.DependencyDescriptor.resolveCandidate(DependencyDescriptor.java:254)
at org.springframework.beans.factory.support.DefaultListableBeanFactory.doResolveDependency(DefaultListableBeanFactory.java:1443)
at org.springframework.beans.factory.support.DefaultListableBeanFactory.resolveDependency(DefaultListableBeanFactory.java:1353)
at org.springframework.beans.factory.support.ConstructorResolver.resolveAutowiredArgument(ConstructorResolver.java:904)
at org.springframework.beans.factory.support.ConstructorResolver.createArgumentArray(ConstructorResolver.java:782)
Since, I didn't have any local cassandra in the staging, I also tried using an alternative Loadbalancing policy as specified here but even with this, I'm getting the below exception where it tries to connect to localhost:
Caused by: com.datastax.oss.driver.api.core.AllNodesFailedException: Could not reach any contact point, make sure you've provided valid addresses (showing first 1 nodes, use getAllErrors() for more): Node(endPoint=localhost/<unresolved>:9042, hostId=null, hashCode=5ad2d127): [com.datastax.oss.driver.api.core.connection.ConnectionInitException: [s1|control|connecting...] Protocol initialization request, step 1 (OPTIONS): failed to send request (io.netty.channel.StacklessClosedChannelException)]
at com.datastax.oss.driver.api.core.AllNodesFailedException.copy(AllNodesFailedException.java:143)
at com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures.getUninterruptibly(CompletableFutures.java:151)
at com.datastax.oss.driver.api.core.session.SessionBuilder.build(SessionBuilder.java:837)
at org.springframework.data.cassandra.config.CqlSessionFactoryBean.buildSystemSession(CqlSessionFactoryBean.java:535)
at org.springframework.data.cassandra.config.CqlSessionFactoryBean.afterPropertiesSet(CqlSessionFactoryBean.java:469)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1853)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1802)
Thanks in advance for any help.
Figured out that extending the class AbstractCassandraConfiguration was causing this issue. Once I removed that from the Configuration class, it started contacting the endpoints which are explicitly configured.
The class AbstractCassandraConfiguration in turn extends another abstract class - AbstractSessionConfiguration. The class AbstractSessionConfiguration has a bean configured with the name cassandraSession which has basic configurations and connects to localhost as a basic contact-point as shown below:
@Bean
public CqlSessionFactoryBean cassandraSession() {
CqlSessionFactoryBean bean = new CqlSessionFactoryBean();
bean.setContactPoints(getContactPoints());
bean.setKeyspaceCreations(getKeyspaceCreations());
bean.setKeyspaceDrops(getKeyspaceDrops());
bean.setKeyspaceName(getKeyspaceName());
bean.setKeyspaceStartupScripts(getStartupScripts());
bean.setKeyspaceShutdownScripts(getShutdownScripts());
bean.setLocalDatacenter(getLocalDataCenter());
bean.setPort(getPort());
bean.setSessionBuilderConfigurer(getSessionBuilderConfigurerWrapper());
return bean;
}
protected String getContactPoints() {
return CqlSessionFactoryBean.DEFAULT_CONTACT_POINTS;
}
This value CqlSessionFactoryBean.DEFAULT_CONTACT_POINTS is initialized as a static constant
public static final String DEFAULT_CONTACT_POINTS = "localhost";