apache-kafkacassandraspark-streamingspark-cassandra-connectorspark-streaming-kafka

java.io.IOException: Failed to write statements to batch_layer.test. The latest exception was Key may not be empty


I am trying to count the number of words in the text and save result to the Cassandra database. Producer reads the data from the file and sends it to kafka. Consumer uses spark streaming to read and process the date,and then sends the result of the calculations to the table.

My producer looks like this:

object ProducerPlayground extends App {

  val topicName = "test"
  private def createProducer: Properties = {
    val producerProperties = new Properties()
    producerProperties.setProperty(
      ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
      "localhost:9092"
    )
    producerProperties.setProperty(
      ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
      classOf[IntegerSerializer].getName
    )
    producerProperties.setProperty(
      ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
      classOf[StringSerializer].getName
    )
    producerProperties
  }

  val producer = new KafkaProducer[Int, String](createProducer)

  val source = Source.fromFile("G:\\text.txt", "UTF-8")

  val lines = source.getLines()

  var key = 0
  for (line <- lines) {
    producer.send(new ProducerRecord[Int, String](topicName, key, line))
    key += 1
  }
  source.close()
  producer.flush()

}

Consumer looks like this:

object BatchLayer {
  def main(args: Array[String]) {

    val brokers = "localhost:9092"
    val topics = "test"
    val groupId = "groupId-1"

    val sparkConf = new SparkConf()
      .setAppName("BatchLayer")
      .setMaster("local[*]")
    val ssc = new StreamingContext(sparkConf, Seconds(3))
    val sc = ssc.sparkContext
    sc.setLogLevel("OFF")

    val topicsSet = topics.split(",").toSet
    val kafkaParams = Map[String, Object](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
      ConsumerConfig.GROUP_ID_CONFIG -> groupId,
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
      ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> "false"
    )
    val stream =
      KafkaUtils.createDirectStream[String, String](
        ssc,
        LocationStrategies.PreferConsistent,
        ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams)
      )

   
    val cass = CassandraConnector(sparkConf)

    cass.withSessionDo { session =>
      session.execute(
        s"CREATE KEYSPACE IF NOT EXISTS batch_layer WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 }"
      )
      session.execute(s"CREATE TABLE IF NOT EXISTS batch_layer.test (key VARCHAR PRIMARY KEY, value INT)")
      session.execute(s"TRUNCATE batch_layer.test")
    }

    stream
      .map(v => v.value())
      .flatMap(x => x.split(" "))
      .filter(x => !x.contains(Array('\n', '\t')))
      .map(x => (x, 1))
      .reduceByKey(_ + _)
      .saveToCassandra("batch_layer", "test", SomeColumns("key", "value"))

    ssc.start()
    ssc.awaitTermination()
  }

}

After starting producer, the program stops working with this error. What did I do wrong ?


Solution

  • It makes very little sense to use legacy streaming in 2021st - it's very cumbersome to use, and you also need to track offsets for Kafka, etc. It's better to use Structured Streaming instead - it will track offsets for your through the checkpoints, you will work with high-level Dataset APIs, etc.

    In your case code could look as following (didn't test, but it's adopted from this working example):

    val streamingInputDF = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("subscribe", "test")
      .load()
    
    val wordsCountsDF = streamingInputDF.selectExpr("CAST(value AS STRING) as value")
      .selectExpr("split(value, '\\w+', -1) as words")
      .selectExpr("explode(words) as word")
      .filter("word != ''")
      .groupBy($"word")
      .count()
      .select($"word", $"count")
    
    // create table ...
    
    val query = wordsCountsDF.writeStream
       .outputMode(OutputMode.Update)
       .format("org.apache.spark.sql.cassandra")
       .option("checkpointLocation", "path_to_checkpoint)
       .option("keyspace", "test")
       .option("table", "<table_name>")
       .start()
    
    query.awaitTermination()
    

    P.S. In your example, most probable error is that you're trying to use .saveToCassandra directly on DStream - it doesn't work this way.