apache-sparkcassandraspark-cassandra-connector

spark cassandra connector problem using catalogs


I am following the instructions found here to connect my spark program to read data from Cassandra. Here is how I have configured spark:

val configBuilder = SparkSession.builder
  .config("spark.sql.extensions", "com.datastax.spark.connector.CassandraSparkExtensions")
  .config("spark.cassandra.connection.host", cassandraUrl)
  .config("spark.cassandra.connection.port", 9042)
  .config("spark.sql.catalog.myCatalogName", "com.datastax.spark.connector.datasource.CassandraCatalog")

According to the documentation, once this is done I should be able to query Cassandra like this:

spark.sql("select * from myCatalogName.myKeyspace.myTable where myPartitionKey = something")

however when I do so I get the following error message:

mismatched input '.' expecting <EOF>(line 1, pos 43)

== SQL ==
select * from myCatalog.myKeyspace.myTable where myPartitionKey = something
----------------------------------^^^

When I try in the following format I am successful at retrieving entries from Cassandra:

val frame = spark
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(Map("keyspace" -> "myKeyspace", "table" -> "myTable"))
  .load()
  .filter(col("timestamp") > startDate && col("timestamp") < endDate)

However this query requires a full table scan to be performed. The table contains a few million entries and I would prefer to avail myself of the predicate Pushdown functionality, which it would seem is only available via the SQL API.

I am using spark-core_2.11:2.4.3, spark-cassandra-connector_2.11:2.5.0 and Cassandra 3.11.6


Solution

  • The Catalogs API is available only in SCC version 3.0 that is not released yet. It will be released with Spark 3.0 release, so it isn't available in the SCC 2.5.0. So for 2.5.0 you need to register your table explicitly, with create or replace temporary view..., as described in docs:

    spark.sql("""CREATE TEMPORARY VIEW myTable
         USING org.apache.spark.sql.cassandra
         OPTIONS (
         table "myTable",
         keyspace "myKeyspace",
         pushdown "true")""")
    

    Regarding the pushdowns (they work the same for all Dataframe APIs, SQL, Scala, Python, ...) - such filtering will happen when your timestamp is the first clustering column. And even in that case, the typical problem is that you may specify startDate and endDate as strings, not timestamp. You can check by executing frame.explain, and checking that predicate is pushed down - it should have * marker near predicate name.

    For example,

    val data = spark.read.cassandraFormat("sdtest", "test").load()
    val filtered = data.filter("ts >= cast('2019-03-10T14:41:34.373+0000' as timestamp) AND ts <= cast('2019-03-10T19:01:56.316+0000' as timestamp)")
    val not_filtered = data.filter("ts >= '2019-03-10T14:41:34.373+0000' AND ts <= '2019-03-10T19:01:56.316+0000'")
    

    the first filter expression will push predicate down, while 2nd (not_filtered) will require a full scan.