javaapache-sparkjoincassandraspark-cassandra-connector

Spark Cassandra Connector 3.0.0 - How to enable DirectJoin - Java


I have cassandra 3.11.9, spark 3.0.1 and spark-cassandra-connector 3.0.0(dependency). I am trying to use the Direct Join of the SCC 3.0.0 but it seems that when I use join on below dataset I get the broadcast hash join of spark.

Dataset<Row> metlistinitial = sp.read().format("org.apache.spark.sql.cassandra")
            .options(new HashMap<String, String>() {
                {
                    put("keyspace", "mdb");
                    put("table", "experiment");
                }
            })
            .load().select(col("experimentid"), col("description"))
            .join(dfexplist,"experimentid")
            .filter(col("description").notEqual("Unidentified"));
metlistinitial.explain();

== Physical Plan ==
*(1) Project [experimentid#6, description#7]
+- *(1) BroadcastHashJoin [experimentid#6], [experimentid#4], Inner, BuildRight
   :- *(1) Project [experimentid#6, description#7]
   :  +- *(1) Filter NOT (description#7 = Unidentified)
   :     +- BatchScan[experimentid#6, description#7] Cassandra Scan: mdb.experiment
 - Cassandra Filters: []
 - Requested Columns: [experimentid,description]
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true])), [id=#19]
  +- LocalTableScan [experimentid#4]

Is there something I should to enable the direct join with a cassandra table? Right now it takes around 8 minutes to do the join and I want to see if it will be faster with the direct join.


Solution

  • Just found it! It seems I just had to activate the Connector by adding

    .config("spark.sql.extensions","com.datastax.spark.connector.CassandraSparkExtensions")
    

    in the spark configuration. Amazing performance. Now it only takes 8 seconds!