I am using version #: com.datastax.spark:spark-cassandra-connector_2.11:2.0.0-M3
I have a RDD from a kafka stream:
kafkaStream.foreachRDD((rdd: RDD[String]) => {
if(rdd.count > 0) {
println(java.time.LocalDateTime.now + ". Consumed: " + rdd.count() + " messages.");
.options(Map("table" -> "tmp", "keyspace" -> "kspace"))
} else {
println(java.time.LocalDateTime.now + ". There are currently no messages on the topic that haven't been consumed.");
The RDD count is around 40K but the spark connector only populates the database with a consistent 457 records.
also prints 40k records.
Here is my table statement:
cqlsh:kspace> CREATE TABLE tmp(tran_id text PRIMARY KEY);
The tran_id is unique for each message.
What am I missing? Why aren't all 40k records making it to that table?
My logs are not showing any exceptions either.
The tran_id is unique for each message.
I lied:
Time to bring it up to our upstream source.