I am trying to write data to Cassandra table (cosmos DB) via Azure DBR job (spark streaming). Getting below exception:
Query [id = , runId = ] terminated with exception: Failed to open native connection to Cassandra at {<name>.cassandra.cosmosdb.azure.com:10350} :: Method com/microsoft/azure/cosmosdb/cassandra/CosmosDbConnectionFactory$.createSession(Lcom/datastax/spark/connector/cql/CassandraConnectorConf;)Lcom/datastax/oss/driver/api/core/CqlSession; is abstract`
`Caused by: IOException: Failed to open native connection to Cassandra at {<name>.cassandra.cosmosdb.azure.com:10350} :: Method com/microsoft/azure/cosmosdb/cassandra/CosmosDbConnectionFactory$.createSession(Lcom/datastax/spark/connector/cql/CassandraConnectorConf;)Lcom/datastax/oss/driver/api/core/CqlSession; is abstract
Caused by: AbstractMethodError: Method com/microsoft/azure/cosmosdb/cassandra/CosmosDbConnectionFactory$.createSession(Lcom/datastax/spark/connector/cql/CassandraConnectorConf;)Lcom/datastax/oss/driver/api/core/CqlSession; is abstract`
What I did to get here:
What I tried:
different versions of connectors or azure cosmos db helper libraries, but some or the other ClassNotFoundExceptions or MethodNotFound errors
Code Snippet:
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.log4j.Logger
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
import org.apache.spark.sql.cassandra._
import java.time.LocalDateTime
def writeDelta(spark:SparkSession,dataFrame: DataFrame,sourceName: String,checkpointLocation: String,dataPath: String,loadType: String,log: Logger): Boolean = {
spark.conf.set("spark.cassandra.output.batch.size.rows", "1")
spark.conf.set("spark.cassandra.connection.remoteConnectionsPerExecutor", "10")
spark.conf.set("spark.cassandra.connection.localConnectionsPerExecutor", "10")
spark.conf.set("spark.cassandra.output.concurrent.writes", "100")
spark.conf.set("spark.cassandra.concurrent.reads", "512")
spark.conf.set("spark.cassandra.output.batch.grouping.buffer.size", "1000")
spark.conf.set("spark.cassandra.connection.keepAliveMS", "60000000") //Increase this number as needed
spark.conf.set("spark.cassandra.output.ignoreNulls","true")
spark.conf.set("spark.cassandra.connection.host", "*******.cassandra.cosmosdb.azure.com")
spark.conf.set("spark.cassandra.connection.port", "10350")
spark.conf.set("spark.cassandra.connection.ssl.enabled", "true")
// spark.cassandra.auth.username and password are set in cluster conf
val write=dataFrame.writeStream.
format("org.apache.spark.sql.cassandra").
options(Map( "table" -> "****", "keyspace" -> "****")).
foreachBatch(upsertToDelta _).
outputMode("update").
option("mergeSchema", "true").
option("mode","PERMISSIVE").
option("checkpointLocation", checkpointLocation).
start()
write.awaitTermination()
}
def upsertToDelta(newBatch: DataFrame, batchId: Long) {
try {
val spark = SparkSession.active
println(LocalDateTime.now())
println("BATCH ID = "+batchId+" REC COUNT = "+newBatch.count())
newBatch.persist()
val userWindow = Window.partitionBy(keyColumn).orderBy(col(timestampCol).desc)
val deDup = newBatch.withColumn("rank", row_number().over(userWindow)).where(col("rank") === 1).drop("rank")
deDup.write
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "****", "keyspace" -> "****"))
.mode("append")
.save()
newBatch.unpersist()
} catch {
case e: Exception =>
throw e
}
}
############################
After implementing solution suggested by @theo-van-kraay, Getting error in executor's logs (Job keeps on running even after this error)
23/02/13 07:28:55 INFO CassandraConnector: Connected to Cassandra cluster.
23/02/13 07:28:56 INFO DataWritingSparkTask: Commit authorized for partition 9 (task 26, attempt 0, stage 6.0)
23/02/13 07:28:56 INFO DataWritingSparkTask: Committed partition 9 (task 26, attempt 0, stage 6.0)
23/02/13 07:28:56 INFO Executor: Finished task 9.0 in stage 6.0 (TID 26). 1511 bytes result sent to driver
23/02/13 07:28:56 INFO DataWritingSparkTask: Commit authorized for partition 7 (task 24, attempt 0, stage 6.0)
23/02/13 07:28:56 INFO DataWritingSparkTask: Commit authorized for partition 1 (task 18, attempt 0, stage 6.0)
23/02/13 07:28:56 INFO DataWritingSparkTask: Commit authorized for partition 3 (task 20, attempt 0, stage 6.0)
23/02/13 07:28:56 INFO DataWritingSparkTask: Commit authorized for partition 5 (task 22, attempt 0, stage 6.0)
23/02/13 07:28:56 ERROR Utils: Aborting task
java.lang.IllegalArgumentException: Unable to get Token Metadata
at com.datastax.spark.connector.cql.LocalNodeFirstLoadBalancingPolicy.$anonfun$tokenMap$1(LocalNodeFirstLoadBalancingPolicy.scala:86)
at scala.Option.orElse(Option.scala:447)
at com.datastax.spark.connector.cql.LocalNodeFirstLoadBalancingPolicy.tokenMap(LocalNodeFirstLoadBalancingPolicy.scala:86)
at com.datastax.spark.connector.cql.LocalNodeFirstLoadBalancingPolicy.replicasForRoutingKey$1(LocalNodeFirstLoadBalancingPolicy.scala:103)
at com.datastax.spark.connector.cql.LocalNodeFirstLoadBalancingPolicy.$anonfun$getReplicas$8(LocalNodeFirstLoadBalancingPolicy.scala:107)
at scala.Option.flatMap(Option.scala:271)
at com.datastax.spark.connector.cql.LocalNodeFirstLoadBalancingPolicy.$anonfun$getReplicas$7(LocalNodeFirstLoadBalancingPolicy.scala:107)
at scala.Option.orElse(Option.scala:447)
at com.datastax.spark.connector.cql.LocalNodeFirstLoadBalancingPolicy.$anonfun$getReplicas$3(LocalNodeFirstLoadBalancingPolicy.scala:107)
at scala.Option.flatMap(Option.scala:271)
...
...
23/02/13 07:28:56 ERROR Utils: Aborting task
You can remove:
com.microsoft.azure.cosmosdb:azure-cosmos-cassandra-spark-helper:1.2.0
It is not required with Spark 3 Cassandra Connector and was created for Spark 2 only. Also remove references to it in the code.