I have cassandra 3.11.9, spark 3.0.1 and spark-cassandra-connector 3.0.0(dependency). I am trying to use the Direct Join of the SCC 3.0.0 but it seems that when I use join on below dataset I get the broadcast hash join of spark.
Dataset<Row> metlistinitial = sp.read().format("org.apache.spark.sql.cassandra")
.options(new HashMap<String, String>() {
{
put("keyspace", "mdb");
put("table", "experiment");
}
})
.load().select(col("experimentid"), col("description"))
.join(dfexplist,"experimentid")
.filter(col("description").notEqual("Unidentified"));
metlistinitial.explain();
== Physical Plan ==
*(1) Project [experimentid#6, description#7]
+- *(1) BroadcastHashJoin [experimentid#6], [experimentid#4], Inner, BuildRight
:- *(1) Project [experimentid#6, description#7]
: +- *(1) Filter NOT (description#7 = Unidentified)
: +- BatchScan[experimentid#6, description#7] Cassandra Scan: mdb.experiment
- Cassandra Filters: []
- Requested Columns: [experimentid,description]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true])), [id=#19]
+- LocalTableScan [experimentid#4]
Is there something I should to enable the direct join with a cassandra table? Right now it takes around 8 minutes to do the join and I want to see if it will be faster with the direct join.
Just found it! It seems I just had to activate the Connector by adding
.config("spark.sql.extensions","com.datastax.spark.connector.CassandraSparkExtensions")
in the spark configuration. Amazing performance. Now it only takes 8 seconds!