I"m trying to make my code more efficient since i have to process billions of rows of data in cassandra. I currently use a JAVA loop within the Datastax Cassandra Spark Connector to pull out the data and put it into a format that I'm familiar with (multimap) in order to get spark to do the manipulation. I'd like to be able to replace this Multimap loop with a direct spark manipulation of the cassandra table to save time and make everything more efficient. I'd greatly appreciate any code suggestions to accomplish that. Here is my existing code:
Statement stmt = new SimpleStatement("SELECT \"Power\",\"Bandwidth\",\"Start_Frequency\" FROM \"SB1000_49552019\".\"Measured_Value\";");
stmt.setFetchSize(2000000);
ResultSet results = session.execute(stmt);
// Get the Variables from each Row of Cassandra Data
Multimap<Double, Float> data = LinkedListMultimap.create();
for (Row row : results){
// Column Names in Cassandra (Case Sensitive)
start_frequency = row.getDouble("Start_Frequency");
power = row.getFloat("Power");
bandwidth = row.getDouble("Bandwidth");
// Create Channel Power Buckets
for(channel = 1.6000E8; channel <= channel_end; ){
if( (channel >= start_frequency) && (channel <= (start_frequency + bandwidth)) ) {
data.put(channel, power);
} // end if
channel+=increment;
} // end for
} // end "row" for
// Create Spark List for DataFrame
List<Value> values = data.asMap().entrySet()
.stream()
.flatMap(x -> x.getValue()
.stream()
.map(y -> new Value(x.getKey(), y)))
.collect(Collectors.toList());
// Create DataFrame and Calculate Results
sqlContext.createDataFrame(sc.parallelize(values), Value.class).groupBy(col("channel"))
.agg(min("power"), max("power"), avg("power"))
.write().mode(SaveMode.Append)
.option("table", "results")
.option("keyspace", "model")
.format("org.apache.spark.sql.cassandra").save();
} // end session
} // End Compute
JavaRDD<MeasuredValue> rdd = javaFunctions(sc).cassandraTable("SB1000_47130646", "Measured_Value", mapRowTo(MeasuredValue.class));
JavaRDD<Value> valueRdd = rdd.flatMap(new FlatMapFunction<MeasuredValue, Value>(){
@Override
public Iterable<Value> call(MeasuredValue row) throws Exception {
double start_frequency = row.getStart_frequency();
float power = row.getPower();
double bandwidth = row.getBandwidth();
// Define Variable
double channel,channel_end, increment;
// Initialize Variables
channel_end = 1.6159E8;
increment = 5000;
List<Value> list = new ArrayList<Value>();
// Create Channel Power Buckets
for(channel = 1.6000E8; channel <= channel_end; ){
if( (channel >= start_frequency) && (channel <= (start_frequency + bandwidth)) ) {
list.add(new Value(channel, power));
} // end if
channel+=increment;
} // end for
return list;
}
});
sqlContext.createDataFrame(valueRdd, Value.class).groupBy(col("channel"))
.agg(min("power"), max("power"), avg("power"))
.write().mode(SaveMode.Append)
.option("table", "results")
.option("keyspace", "model")
.format("org.apache.spark.sql.cassandra").save();
} // end session
public static class MeasuredValue implements Serializable {
public MeasuredValue() { }
private double start_frequency;
public double getStart_frequency() { return start_frequency; }
public void setStart_frequency(double start_frequency) { this.start_frequency = start_frequency; }
private double bandwidth ;
public double getBandwidth() { return bandwidth; }
public void setBandwidth(double bandwidth) { this.bandwidth = bandwidth; }
private float power;
public float getPower() { return power; }
public void setPower(float power) { this.power = power; }
}