apache-sparkapache-kafkaspark-structured-streamingspark-kafka-integration

Spark 3 structured streaming use maxOffsetsPerTrigger in Kafka source with Trigger.Once


We need to use maxOffsetsPerTrigger in the Kafka source with Trigger.Once() in structured streaming but based on this issue it seems reads allAvailable in spark 3. Is there a way for achieving rate limit in this situation?

Here is a sample code in spark 3:

def options: Map[String, String] = Map(
  "kafka.bootstrap.servers" -> conf.getStringSeq("bootstrapServers").mkString(","),
  "subscribe" -> conf.getString("topic")
) ++
  Option(conf.getLong("maxOffsetsPerTrigger")).map("maxOffsetsPerTrigger" -> _.toString)
val streamingQuery = sparkSession.readStream.format("kafka").options(options)
  .load
  .writeStream
  .trigger(Trigger.Once)
  .start()

Solution

  • There is no other way around it to properly set a rate limit. If the maxOffsetsPerTrigger is not applicable for streaming jobs with the Once trigger you could do the following to achieve identical result:

    1. Choose another trigger and use maxOffsetsPerTrigger to limit the rate and kill this job manually after it finished processing all data.

    2. Use options startingOffsets and endingOffsets while making the job a batch job. Repeat until you have processed all data within the topic. However, there is a reason why "Streaming in RunOnce mode is better than Batch" as detailed here.

    Last option would be to look into the linked pull request and compile Spark on your own.