apache-kafkaspark-skinning

stopping spark streaming after reading first batch of data


I am using spark streaming to consume kafka messages. I want to get some messages as sample from kafka instead of reading all messages. So I want to read a batch of messages, return them to caller and stopping spark streaming. Currently I am passing batchInterval time in awaitTermination method of spark streaming context method. I don't now how to return processed data to caller from spark streaming. Here is my code that I am using currently

def getsample(params: scala.collection.immutable.Map[String, String]): Unit = {
    if (params.contains("zookeeperQourum"))
      zkQuorum = params.get("zookeeperQourum").get
    if (params.contains("userGroup"))
      group = params.get("userGroup").get
    if (params.contains("topics"))
      topics = params.get("topics").get
    if (params.contains("numberOfThreads"))
      numThreads = params.get("numberOfThreads").get
    if (params.contains("sink"))
      sink = params.get("sink").get
    if (params.contains("batchInterval"))
      interval = params.get("batchInterval").get.toInt
    val sparkConf = new SparkConf().setAppName("KafkaConsumer").setMaster("spark://cloud2-server:7077")
    val ssc = new StreamingContext(sparkConf, Seconds(interval))
    val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
    var consumerConfig = scala.collection.immutable.Map.empty[String, String]
    consumerConfig += ("auto.offset.reset" -> "smallest")
    consumerConfig += ("zookeeper.connect" -> zkQuorum)
    consumerConfig += ("group.id" -> group)
    var data = KafkaUtils.createStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder](ssc, consumerConfig, topicMap, StorageLevel.MEMORY_ONLY).map(_._2)
    val streams = data.window(Seconds(interval), Seconds(interval)).map(x => new String(x))
    streams.foreach(rdd => rdd.foreachPartition(itr => {
      while (itr.hasNext && size >= 0) {
        var msg=itr.next
        println(msg)
        sample.append(msg)
        sample.append("\n")
        size -= 1
      }
    }))
    ssc.start()
    ssc.awaitTermination(5000)
    ssc.stop(true)
  }

So instead of saving messages in a String builder called "sample" I want to return to caller.


Solution

  • You can implement a StreamingListener and then inside it, onBatchCompleted you can call ssc.stop()

    private class MyJobListener(ssc: StreamingContext) extends StreamingListener {
    
      override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) = synchronized {
    
        ssc.stop(true)
    
      }
    
    }
    

    This is how you attach your SparkStreaming to the JobListener:

    val listen = new MyJobListener(ssc)
    ssc.addStreamingListener(listen)
    
    ssc.start()
    ssc.awaitTermination()