apache-sparkcassandraspark-cassandra-connector

NoHostAvailableException (no host was tried) using Spark Cassandra Connector


I am having a problem with the DataStax Spark Connector for Cassandra. My application contains a Spark operation that performs a number of single-record queries on the Cassandra database; a number of these queries will succeed, but at some point one of the queries will fail with a NoHostAvailableException with the message All host(s) tried for query failed (no host was tried).

Stack trace

2018-06-26 12:32:09 ERROR Executor:91 - Exception in task 0.3 in stage 0.0 (TID 6)
com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (no host was tried)
    at com.datastax.driver.core.exceptions.NoHostAvailableException.copy(NoHostAvailableException.java:84)
    at com.datastax.driver.core.exceptions.NoHostAvailableException.copy(NoHostAvailableException.java:37)
    at com.datastax.driver.core.DriverThrowables.propagateCause(DriverThrowables.java:37)
    at com.datastax.driver.core.DefaultResultSetFuture.getUninterruptibly(DefaultResultSetFuture.java:245)
    at com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:68)
    at sun.reflect.GeneratedMethodAccessor10.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at com.datastax.spark.connector.cql.SessionProxy.invoke(SessionProxy.scala:40)
    at com.sun.proxy.$Proxy15.execute(Unknown Source)
    at sun.reflect.GeneratedMethodAccessor10.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at com.datastax.spark.connector.cql.SessionProxy.invoke(SessionProxy.scala:40)
    at com.sun.proxy.$Proxy16.execute(Unknown Source)
    at [line that contains the session.execute() call]
    [...]
Caused by: com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (no host was tried)
    at com.datastax.driver.core.RequestHandler.reportNoMoreHosts(RequestHandler.java:211)
    at com.datastax.driver.core.RequestHandler.access$1000(RequestHandler.java:46)
    at com.datastax.driver.core.RequestHandler$SpeculativeExecution.findNextHostAndQuery(RequestHandler.java:275)
    at com.datastax.driver.core.RequestHandler.startNewExecution(RequestHandler.java:115)
    at com.datastax.driver.core.RequestHandler.sendRequest(RequestHandler.java:95)
    at com.datastax.driver.core.SessionManager.executeAsync(SessionManager.java:132)
    ... 32 more

In an attempt to analyse this problem I have succeeded in reproducing it in a simple environment:

Below is the minimal code with which I can reproduce the issue.

Code

val pkColumn1Value = 1L
val pkColumn2Values: Dataset[Long] = sparkSession.createDataset(1L to 19 by 2)
val connector: CassandraConnector = [...]

val results: Dataset[SimpleValue] = pkColumn2Values.mapPartitions { iterator =>
    connector.withSessionDo { session =>
        val clusteringKeyValues = Seq(...)

        val preparedStatement = session.prepare("select * from simple_values where pk_column_1_value = ? and pk_column_2_value = ? and clustering_key_value = ?")

        iterator.flatMap { pkColumn2Value =>
            val boundStatements = clusteringKeyValues.iterator.map(clusteringKeyValue =>
                preparedStatement.bind(
                    pkColumn1Value.asInstanceOf[AnyRef]
                    , pkColumn2Value.asInstanceOf[AnyRef]
                    , clusteringKeyValue.asInstanceOf[AnyRef]
                )
            )

            boundStatements.map { boundStatement =>
                val record = try {
                    session.execute(boundStatement).one()
                } catch {
                    case noHostAvailableException: NoHostAvailableException =>
                        log.error(s"Encountered NHAE, getErrors: ${noHostAvailableException.getErrors}")
                        throw noHostAvailableException
                    case exception =>
                        throw exception
                }

                log.error(s"Retrieved record $record")
                // Sleep to simulate an operation being performed on the value.
                Thread.sleep(100)

                record
            }
        }
    }
}

log.error(s"Perfunctory log statement that triggers an action: ${results.collect().last}")

Some interesting things I have noticed

In this GitHub issue for the connector, commenter pkolaczk mentions a potential issue that could cause the connector to succeed in its initial connection to Cassandra and to fail when trying to later establish additional connections. This sounds promising because it matches with the above points (which suggest that the problem will only occur once the original connections have been closed, which would never happen if the connection is re-established for every element in the Dataset individually); however, I have been unable to find any indication that I have misconfigured an IP address or any other plausible cause for this phenomenon (or even confirmation that this phenomenon is in fact causing the issue in the first place).

Some things I have checked and/or tried

Some version numbers

Any indication of what is causing these errors or an idea of how to fix the issue would be greatly appreciated.


Solution

  • I cross-posted this question to the connector's Google User Group (https://groups.google.com/a/lists.datastax.com/d/msg/spark-connector-user/oWrP7qeHJ7k/pmgnF_kbBwAJ), where it was confirmed by one of its contributors that there is no reason not to have a high value for spark.cassandra.connection.keep_alive_ms. I have bumped up this value to a point where I could be reasonably certain that no operations would pass it, and have had no problems since.