javaapache-sparkcassandraspark-cassandra-connectordata-partitioning

spark-cassandra-connector - repartitionByCassandraReplica returns empty RDD - Java


So, I have a 16 node cluster where every node has Spark and Cassandra installed while I am using the Spark-Cassandra Connector 3.0.0. I am trying to join a dataset with a cassandra table on the partition key, while also trying to use .repartitionByCassandraReplica.

However it seems I just get an empty rdd with 0 partitions(line 5 below)! Any ideas why?

Encoder<ExperimentForm> ExpEncoder = Encoders.bean(ExperimentForm.class);
//FYI experimentlist is a List<String>
Dataset<ExperimentForm> dfexplistoriginal = sp.createDataset(experimentlist, Encoders.STRING()).toDF("experimentid").as(ExpEncoder);
JavaRDD<ExperimentForm> predf = CassandraJavaUtil.javaFunctions(dfexplistoriginal.toJavaRDD()).repartitionByCassandraReplica("mdb","experiment",experimentlist.size(),CassandraJavaUtil.someColumns("experimentid"),CassandraJavaUtil.mapToRow(ExperimentForm.class));
System.out.println(predf.collect()); //Here it gives an empty dataset with 0 partitions

Dataset<ExperimentForm> newdfexplist =  sp.createDataset(predf.rdd(), ExpEncoder);
Dataset<Row> readydfexplist = newdfexplist.as(Encoders.STRING()).toDF("experimentid");

Dataset<Row> metlistinitial = sp.read().format("org.apache.spark.sql.cassandra")
                .options(new HashMap<String, String>() {
                    {
                        put("keyspace", "mdb");
                        put("table", "experiment");
                    }
                })
                .load().select(col("experimentid"), col("description"), col("intensity")).join(readydfexplist, "experimentid");

In case needed this is the experiment table in Cassandra:

CREATE TABLE experiment(
experimentid varchar,
description text,
rt float,
intensity float,
mz float,
identifier text,
chemical_formula text,
filename text,
PRIMARY KEY ((experimentid),description, rt, intensity, mz, identifier, chemical_formula, filename));

and this is the ExperimentForm class:

public class ExperimentForm {

    private String experimentid;

    public String getExperimentid() {
        return experimentid;
    }
    public void setExperimentid(String experimentid) {
        this.experimentid = experimentid;
    }
}

Let me know if you need any additional information.


Solution

  • The answer is basically the same as here Spark-Cassandra: repartitionByCassandraReplica or converting dataset to JavaRDD and back do not maintain number of partitions?

    Just had to do the repartitionByCassandraReplica and JoinWithCassandraTable on RDD and then convert back to dataset.