apache-sparkapache-spark-sqlspark-cassandra-connectorscala-spark

Performance degraded after upgrading from spark-cassandra-connector 2.4.0 to 3.1.0


Context: Working on a message processing application which processes millions of messages every day. The application is built using scala, spark, and uses Kafka, Cassandra DB. Multiple DB queries are executed while processing each message. The application was recently migrated from a Spark cluster to k8s (using mlrun, spark-operator). For this migration, the scala version, and dependencies version were upgraded.

Issue: Noticed that the Cassandra DB queries are taking longer to execute after the migration; this has caused our application performance to degrade (slower processing of messages). The Cassandra Cluster is the same one as used previously.

Following are the dependencies version: -

Before migration-

<properties>
    <scala.tools.version>2.11</scala.tools.version>
    <scala.version>2.11.12</scala.version>
    <spark.version>2.4.0</spark.version> 
</properties>
<dependency>
    <groupId>org.scala-lang</groupId>
    <artifactId>scala-library</artifactId>
    <version>${scala.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_${scala.tools.version}</artifactId>
    <version>${spark.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>com.datastax.spark</groupId>
    <artifactId>spark-cassandra-connector-unshaded_2.11</artifactId>
    <version>2.4.0</version>
</dependency>

After migration-

<properties>
        <scala.tools.version>2.12</scala.tools.version>
        <scala.version>2.12.10</scala.version>
        <spark.version>3.1.2</spark.version>
</properties>
<dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>${scala.version}</version>
</dependency>
<dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_${scala.tools.version}</artifactId>
        <version>${spark.version}</version>
        <scope>provided</scope>
</dependency>
<dependency>
        <groupId>com.datastax.spark</groupId>
        <artifactId>spark-cassandra-connector_${scala.tools.version}</artifactId>
        <version>3.1.0</version>
</dependency>

The changes made for the migration were:

  1. scala, related libraries (spark-cassandra-connector) version upgrades.
  2. moved from Spark cluster to k8s. This will not affect the query execution time, as both versions of dependencies were tested locally and there is a huge difference in query execution time between the versions.

The issue is probably caused due to the version upgrade (specifically spark-cassandra-connector version).

I wrote a simple Scala program (which replicates the logic used in our actual application) to check why there is an increase in Cassandra DB query execution time:

import scala.io.Source

import java.time.LocalDateTime
import java.time.format.DateTimeFormatter
import java.time.temporal.ChronoUnit

import org.apache.spark.sql.SparkSession
import com.datastax.spark.connector.toSparkContextFunctions
import org.apache.log4j.{Level, LogManager}
import org.joda.time.DateTime

object SimpleCassandraProgram {

  def main(args: Array[String]): Unit = {

    val sBuild = SparkSession.builder()
      .appName(args(5))
      .master(args(0).trim) // local
      .config("spark.cassandra.connection.host","<cassandra_host>")
    val spark = sBuild.getOrCreate()
    val sc = spark.sparkContext

    @transient lazy val log = LogManager.getLogger(sc.appName)
    LogManager.getRootLogger.setLevel(Level.DEBUG)

    val where_field = args(3)

    var count = 0
    var totalTime: Long = 0
    val fmt = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")

    val txtFile = args(4).trim

    println(f"START TIME: " + DateTime.now())

    // The file I used contains 10 lines => 10 select queries will be executed
    Source.fromFile(txtFile).getLines().foreach(line => {
      count += 1
      var isProcessed = false
      val startTime = LocalDateTime.now
      log.info("Start of Query Execution: " + fmt.format(startTime) + " Run number: " + count)

      // args(1) is keyspace name, args(2) is table name
      val resRDD = sc.cassandraTable(args(1), args(2)).select("<field>").where(s"${where_field} =?", line)

      if (!resRDD.isEmpty()) {
        val latestRow = resRDD.first()
        val field_value = latestRow.getStringOption("<field>")

        if(field_value != None){
          isProcessed=true
          log.info("Record has already been processed")
        }

      }

      val finishTime = LocalDateTime.now
      log.info("End of Query Execution: " + fmt.format(finishTime) + " Run number: " + count)
      val timeDiff = ChronoUnit.MILLIS.between(startTime, finishTime)
      log.info("Took " + timeDiff + "ms to Execute the Query.")

      // Excluding first query execution time since it includes the time to connect to Cassandra
      if (count != 1) totalTime += timeDiff
    })
    println(f"FINISH TIME: " + DateTime.now())
    println(f"AVERAGE QUERY EXECUTION TIME (EXCLUDING FIRST QUERY) - ${totalTime/(count - 1)}")
  }

}

I ran the above code with dependencies used before migration, and after migration. Here are the results:

With Before Migration dependencies- AVERAGE QUERY EXECUTION TIME (EXCLUDING FIRST QUERY) - 1468ms

With After Migration dependencies- AVERAGE QUERY EXECUTION TIME (EXCLUDING FIRST QUERY) - 4109ms

From the above, it can be observed that the query time has increased significantly with just the change of dependencies version.

The difference I noticed between the 2 versions (from debug logs) is- A bunch of internal queries are being executed before starting the actual job in the newer version of dependencies. This is causing a delay.

Before Migration dependencies/ older version (not much delay before starting the job)-

23/02/07 14:45:38 INFO cassandra-test: Start of Query Execution: 2023-02-07 14:45:38.491 Run number: 3
...
(no internal db queries executed/ not present in debug logs)
...
23/02/07 14:45:38 INFO SparkContext: Starting job: take at CassandraRDD.scala:127

After Migration dependencies/ newer version (delay before starting the job, caused due to many internal queries being executed)-

23/02/07 14:49:03 INFO cassandra-test: Start of Query Execution: 2023-02-07 14:49:03.586 Run number: 3
...
...
23/02/07 14:49:03 DEBUG SchemaAgreementChecker: [s0] Checking schema agreement
23/02/07 14:49:03 DEBUG AdminRequestHandler: [s0] Executing query 'SELECT schema_version FROM system.local WHERE key='local''
23/02/07 14:49:03 DEBUG AdminRequestHandler: [s0] Executing query 'SELECT * FROM system.peers'
23/02/07 14:49:03 DEBUG AdminRequestHandler: [s0] Successfully wrote query 'SELECT schema_version FROM system.local WHERE key='local'', waiting for response
23/02/07 14:49:03 DEBUG AdminRequestHandler: [s0] Successfully wrote query 'SELECT * FROM system.peers', waiting for response
...
...
23/02/07 14:49:03 DEBUG AdminRequestHandler: [s0] Executing query 'SELECT * FROM system_schema.keyspaces'
23/02/07 14:49:03 DEBUG AdminRequestHandler: [s0] Executing query 'SELECT * FROM system_schema.types'
23/02/07 14:49:03 DEBUG AdminRequestHandler: [s0] Executing query 'SELECT * FROM system_schema.tables'
23/02/07 14:49:03 DEBUG AdminRequestHandler: [s0] Executing query 'SELECT * FROM system_schema.columns'
23/02/07 14:49:03 DEBUG AdminRequestHandler: [s0] Executing query 'SELECT * FROM system_schema.indexes'
23/02/07 14:49:03 DEBUG AdminRequestHandler: [s0] Executing query 'SELECT * FROM system_schema.views'
23/02/07 14:49:03 DEBUG AdminRequestHandler: [s0] Executing query 'SELECT * FROM system_schema.functions'
23/02/07 14:49:03 DEBUG AdminRequestHandler: [s0] Executing query 'SELECT * FROM system_schema.aggregates'
23/02/07 14:49:03 DEBUG AdminRequestHandler: [s0] Successfully wrote query 'SELECT * FROM system_schema.keyspaces', waiting for response
23/02/07 14:49:03 DEBUG AdminRequestHandler: [s0] Successfully wrote query 'SELECT * FROM system_schema.types', waiting for response
23/02/07 14:49:03 DEBUG AdminRequestHandler: [s0] Successfully wrote query 'SELECT * FROM system_schema.tables', waiting for response
23/02/07 14:49:03 DEBUG AdminRequestHandler: [s0] Successfully wrote query 'SELECT * FROM system_schema.columns', waiting for response
23/02/07 14:49:03 DEBUG AdminRequestHandler: [s0] Successfully wrote query 'SELECT * FROM system_schema.indexes', waiting for response
23/02/07 14:49:03 DEBUG AdminRequestHandler: [s0] Successfully wrote query 'SELECT * FROM system_schema.views', waiting for response
23/02/07 14:49:03 DEBUG AdminRequestHandler: [s0] Successfully wrote query 'SELECT * FROM system_schema.functions', waiting for response
23/02/07 14:49:03 DEBUG AdminRequestHandler: [s0] Successfully wrote query 'SELECT * FROM system_schema.aggregates', waiting for response
...
...
23/02/07 14:49:04 DEBUG RuleBasedKeyspaceFilter: [s0] Filtering out 'keyspace_xxxxx' because it matches at least one regex exclude
23/02/07 14:49:04 DEBUG RuleBasedKeyspaceFilter: [s0] Filtering out 'keyspace_yyyyy' because it matches at least one regex exclude
...
...
23/02/07 14:49:05 INFO SparkContext: Starting job: take at CassandraRDD.scala:126

I suspect that these additional queries being executed before executing the actual query of our application is causing the performance degradation. Is this actually the case? What is the root cause for the increase in query execution time between the 2 versions?

Above issue is discussing only the select query performance, however we have observed increase in query time for insert and update queries as well.

The query execution time for each query has almost tripled after moving to the newer version of the dependency, which is causing huge performance degradation in our application. What would be the best approach to resolve this issue (except downgrading the dependencies)?


Solution

  • The "bunch of internal queries" you refer to are in fact debug messages from the Cassandra Java driver embedded in the connector.

    The Spark Cassandra connector uses the Java driver under the hood to connect to Cassandra clusters. Version 2.4.0 of the connector shipped with Java driver v3.1.4 while the connector v3.1.0 shipped with Java driver v4.12.0.

    Java driver v4.x was completely refactored and now happens to log additional debug messages during its initialisation phase, i.e. when it connects to a Cassandra cluster for the first time. When the client/driver is started, it initialises itself by doing administrative tasks which include:

    These administrative tasks are done by all versions of the Java driver. The only difference is that v4.x logs them as DEBUG messages.

    If you look closely, the timestamps on the log entries you posted shows that there was no difference in time from the first debug entry to the last:

    23/02/07 14:49:03 DEBUG SchemaAgreementChecker: [s0] Checking schema agreement
    ...
    23/02/07 14:49:03 DEBUG AdminRequestHandler: [s0] Successfully wrote query 'SELECT * FROM system_schema.aggregates', waiting for response
    

    Just because earlier versions of the driver did not log this information doesn't mean they didn't happen so your conclusion is flawed.

    The thing that you should focus on is the fact that you've upgraded from Spark 2.4 to Spark 3.1 -- that's a massive jump. You need to analyse the differences in the various stages of the Spark jobs which get executed using the Spark UI.

    Note that it won't be an apples-for-apples comparison because of the major version difference between Spark 2.4 and 3.1. Cheers!