apache-sparkcassandracassandra-3.0spark-cassandra-connector

Spark cassandra connection not pushing down the predicates/conditions. Instead filtering it in spark


I am trying to perform migration from one cassandra cluster (v3) to another cassandra cluster (v4).

The table and schema is same in source and destination cluster as follows:

CREATE KEYSPACE mykeyspace WITH replication = {'class': 'NetworkTopologyStrategy', 'DC': '3'}  AND durable_writes = true;

CREATE TABLE mykeyspace.mytable (monthyear text,
                                 key1 text,
                                 generatedid int,
                                 eventdatetime timestamp,
                                 value double,
                                 PRIMARY KEY ((monthyear, key1), generatedid, eventdatetime)
) WITH CLUSTERING ORDER BY (generatedid ASC, eventdatetime DESC);

Here is my very simple code (written in Java) using Spark cassandra connector.

spark.sql("SELECT * FROM cassandra_old.mykeyspace.mytable WHERE monthyear = '10-2023'").explain();

The catalog name is cassandra_old for source C*. When I submit this application the generated plan does not include the monthyear filter as part of Cassandra filters but rather that filter is being done by Spark.

Here is the generated physical plan:

== Physical Plan ==
*(1) Project [monthyear#0, key1#1, generatedid#2, eventdatetime#3, value#4]
+- *(1) Filter (monthyear#0 = 10-2023)
   +- BatchScan mytable[monthyear#0, key1#1, generatedid#2, eventdatetime#3, value#4] Cassandra Scan: mykeyspace.mytable
 - Cassandra Filters: []
 - Requested Columns: [monthyear,key1,generatedid,eventdatetime,value] RuntimeFilters: []

I can't figure out what is wrong with the code. As filtering is done in Spark, the job takes a long time to read and process all the data.

Here are the versions I am using:

Spark Cassandra connector version: spark-cassandra-connector_2.12:3.4.1

Spark version: 3.4.1-amzn-0 (emr cluster)


Solution

  • For a predicate pushdown to occur, you'll need to provide all of the partition keys in your query as using partitial partition keys is not supported with CQL/C*. Since you've a composite partition key on this table, i.e.

    PRIMARY KEY ((monthyear, key1), ..., ...)

    for your predicates to get pushed down, you'll need the query to be as:

    spark.sql("SELECT * FROM cassandra_old.mykeyspace.mytable WHERE monthyear = '10-2023' AND key1 = 'my_key1_value'").explain();