So, I have a 16 node cluster where every node has Spark and Cassandra installed while I am using the Spark-Cassandra Connector 3.0.0. I am trying to join a dataset with a cassandra table on the partition key, while also trying to use .repartitionByCassandraReplica.
However it seems I just get an empty rdd with 0 partitions(line 5 below)! Any ideas why?
Encoder<ExperimentForm> ExpEncoder = Encoders.bean(ExperimentForm.class);
//FYI experimentlist is a List<String>
Dataset<ExperimentForm> dfexplistoriginal = sp.createDataset(experimentlist, Encoders.STRING()).toDF("experimentid").as(ExpEncoder);
JavaRDD<ExperimentForm> predf = CassandraJavaUtil.javaFunctions(dfexplistoriginal.toJavaRDD()).repartitionByCassandraReplica("mdb","experiment",experimentlist.size(),CassandraJavaUtil.someColumns("experimentid"),CassandraJavaUtil.mapToRow(ExperimentForm.class));
System.out.println(predf.collect()); //Here it gives an empty dataset with 0 partitions
Dataset<ExperimentForm> newdfexplist = sp.createDataset(predf.rdd(), ExpEncoder);
Dataset<Row> readydfexplist = newdfexplist.as(Encoders.STRING()).toDF("experimentid");
Dataset<Row> metlistinitial = sp.read().format("org.apache.spark.sql.cassandra")
.options(new HashMap<String, String>() {
{
put("keyspace", "mdb");
put("table", "experiment");
}
})
.load().select(col("experimentid"), col("description"), col("intensity")).join(readydfexplist, "experimentid");
In case needed this is the experiment table in Cassandra:
CREATE TABLE experiment(
experimentid varchar,
description text,
rt float,
intensity float,
mz float,
identifier text,
chemical_formula text,
filename text,
PRIMARY KEY ((experimentid),description, rt, intensity, mz, identifier, chemical_formula, filename));
and this is the ExperimentForm class:
public class ExperimentForm {
private String experimentid;
public String getExperimentid() {
return experimentid;
}
public void setExperimentid(String experimentid) {
this.experimentid = experimentid;
}
}
Let me know if you need any additional information.
The answer is basically the same as here Spark-Cassandra: repartitionByCassandraReplica or converting dataset to JavaRDD and back do not maintain number of partitions?
Just had to do the repartitionByCassandraReplica and JoinWithCassandraTable on RDD and then convert back to dataset.