I am using version #: com.datastax.spark:spark-cassandra-connector_2.11:2.0.0-M3
I have a RDD from a kafka stream:
kafkaStream.foreachRDD((rdd: RDD[String]) => {
if(rdd.count > 0) {
println(java.time.LocalDateTime.now + ". Consumed: " + rdd.count() + " messages.");
sqlContext.read.json(rdd)
.select("count_metadata.tran_id")
.write
.format("org.apache.spark.sql.cassandra")
.options(Map("table" -> "tmp", "keyspace" -> "kspace"))
.mode(SaveMode.Append)
.save();
} else {
println(java.time.LocalDateTime.now + ". There are currently no messages on the topic that haven't been consumed.");
}
});
The RDD count is around 40K but the spark connector only populates the database with a consistent 457 records.
sqlContext.read.json(rdd).select("count_metadata.tran_id").count
also prints 40k records.
Here is my table statement:
cqlsh:kspace> CREATE TABLE tmp(tran_id text PRIMARY KEY);
The tran_id is unique for each message.
What am I missing? Why aren't all 40k records making it to that table?
My logs are not showing any exceptions either.
The tran_id is unique for each message.
I lied:
println(df.distinct.count);
prints....
457
Time to bring it up to our upstream source.