I am having a problem with the DataStax Spark Connector for Cassandra. My application contains a Spark operation that performs a number of single-record queries on the Cassandra database; a number of these queries will succeed, but at some point one of the queries will fail with a NoHostAvailableException
with the message All host(s) tried for query failed (no host was tried)
.
2018-06-26 12:32:09 ERROR Executor:91 - Exception in task 0.3 in stage 0.0 (TID 6)
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (no host was tried)
at com.datastax.driver.core.exceptions.NoHostAvailableException.copy(NoHostAvailableException.java:84)
at com.datastax.driver.core.exceptions.NoHostAvailableException.copy(NoHostAvailableException.java:37)
at com.datastax.driver.core.DriverThrowables.propagateCause(DriverThrowables.java:37)
at com.datastax.driver.core.DefaultResultSetFuture.getUninterruptibly(DefaultResultSetFuture.java:245)
at com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:68)
at sun.reflect.GeneratedMethodAccessor10.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.datastax.spark.connector.cql.SessionProxy.invoke(SessionProxy.scala:40)
at com.sun.proxy.$Proxy15.execute(Unknown Source)
at sun.reflect.GeneratedMethodAccessor10.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.datastax.spark.connector.cql.SessionProxy.invoke(SessionProxy.scala:40)
at com.sun.proxy.$Proxy16.execute(Unknown Source)
at [line that contains the session.execute() call]
[...]
Caused by: com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (no host was tried)
at com.datastax.driver.core.RequestHandler.reportNoMoreHosts(RequestHandler.java:211)
at com.datastax.driver.core.RequestHandler.access$1000(RequestHandler.java:46)
at com.datastax.driver.core.RequestHandler$SpeculativeExecution.findNextHostAndQuery(RequestHandler.java:275)
at com.datastax.driver.core.RequestHandler.startNewExecution(RequestHandler.java:115)
at com.datastax.driver.core.RequestHandler.sendRequest(RequestHandler.java:95)
at com.datastax.driver.core.SessionManager.executeAsync(SessionManager.java:132)
... 32 more
In an attempt to analyse this problem I have succeeded in reproducing it in a simple environment:
Below is the minimal code with which I can reproduce the issue.
val pkColumn1Value = 1L
val pkColumn2Values: Dataset[Long] = sparkSession.createDataset(1L to 19 by 2)
val connector: CassandraConnector = [...]
val results: Dataset[SimpleValue] = pkColumn2Values.mapPartitions { iterator =>
connector.withSessionDo { session =>
val clusteringKeyValues = Seq(...)
val preparedStatement = session.prepare("select * from simple_values where pk_column_1_value = ? and pk_column_2_value = ? and clustering_key_value = ?")
iterator.flatMap { pkColumn2Value =>
val boundStatements = clusteringKeyValues.iterator.map(clusteringKeyValue =>
preparedStatement.bind(
pkColumn1Value.asInstanceOf[AnyRef]
, pkColumn2Value.asInstanceOf[AnyRef]
, clusteringKeyValue.asInstanceOf[AnyRef]
)
)
boundStatements.map { boundStatement =>
val record = try {
session.execute(boundStatement).one()
} catch {
case noHostAvailableException: NoHostAvailableException =>
log.error(s"Encountered NHAE, getErrors: ${noHostAvailableException.getErrors}")
throw noHostAvailableException
case exception =>
throw exception
}
log.error(s"Retrieved record $record")
// Sleep to simulate an operation being performed on the value.
Thread.sleep(100)
record
}
}
}
}
log.error(s"Perfunctory log statement that triggers an action: ${results.collect().last}")
Dataset#mapPartitions()
to be able to prepare the select statement only once per partition. The problem disappears when I swallow my pride and use Dataset#map()
or Dataset#flatMap()
instead, but I would like to use Dataset#mapPartitions()
for the (ostensible) performance benefits of preparing the query only once per Dataset partition.NoHostAvailableException
seems to occur a fixed amount of time after the first query is executed. Some investigation confirmed that this amount of time is equal to the value of connector property spark.cassandra.connection.keep_alive_ms
. Setting this property to a ridiculously high value would ostensibly resolve the problem, but this seems like a dirty work-around instead of a sensible solution.In this GitHub issue for the connector, commenter pkolaczk mentions a potential issue that could cause the connector to succeed in its initial connection to Cassandra and to fail when trying to later establish additional connections. This sounds promising because it matches with the above points (which suggest that the problem will only occur once the original connections have been closed, which would never happen if the connection is re-established for every element in the Dataset individually); however, I have been unable to find any indication that I have misconfigured an IP address or any other plausible cause for this phenomenon (or even confirmation that this phenomenon is in fact causing the issue in the first place).
NoHostAvailableException
s are always preceded by other errors. I have checked my logs on multiple occasions, but cannot find any other error messages or stack traces.NoHostAvailableException#getErrors
to obtain a more detailed explanation of the issue, but this method always returns an empty map for me.mapPartitions
and not when using map
).spark.cassandra.connection.local_dc
was originally unset. Setting this property to the appropriate data center name had no noticeable effect on the issue.spark.cassandra.connection.timeout_ms
and spark.cassandra.read.timeout_ms
to ridiculously high values; this had no noticeable effect on the issue.Any indication of what is causing these errors or an idea of how to fix the issue would be greatly appreciated.
I cross-posted this question to the connector's Google User Group (https://groups.google.com/a/lists.datastax.com/d/msg/spark-connector-user/oWrP7qeHJ7k/pmgnF_kbBwAJ), where it was confirmed by one of its contributors that there is no reason not to have a high value for spark.cassandra.connection.keep_alive_ms
. I have bumped up this value to a point where I could be reasonably certain that no operations would pass it, and have had no problems since.