scalaapache-sparkapache-kafkaspark-streamingspark-streaming-kafka

How to write spark streaming DF to Kafka topic


I am using Spark Streaming to process data between two Kafka queues but I can not seem to find a good way to write on Kafka from Spark. I have tried this:

input.foreachRDD(rdd =>
  rdd.foreachPartition(partition =>
    partition.foreach {
      case x: String => {
        val props = new HashMap[String, Object]()

        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
          "org.apache.kafka.common.serialization.StringSerializer")
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
          "org.apache.kafka.common.serialization.StringSerializer")

        println(x)
        val producer = new KafkaProducer[String, String](props)
        val message = new ProducerRecord[String, String]("output", null, x)
        producer.send(message)
      }
    }
  )
)

and it works as intended but instancing a new KafkaProducer for every message is clearly unfeasible in a real context and I'm trying to work around it.

I would like to keep a reference to a single instance for every process and access it when I need to send a message. How can I write to Kafka from Spark Streaming?


Solution

  • My first advice would be to try to create a new instance in foreachPartition and measure if that is fast enough for your needs (instantiating heavy objects in foreachPartition is what the official documentation suggests).

    Another option is to use an object pool as illustrated in this example:

    https://github.com/miguno/kafka-storm-starter/blob/develop/src/main/scala/com/miguno/kafkastorm/kafka/PooledKafkaProducerAppFactory.scala

    I however found it hard to implement when using checkpointing.

    Another version that is working well for me is a factory as described in the following blog post, you just have to check if it provides enough parallelism for your needs (check the comments section):

    http://allegro.tech/2015/08/spark-kafka-integration.html