We need to use maxOffsetsPerTrigger
in the Kafka source with Trigger.Once()
in structured streaming but based on this issue it seems reads allAvailable
in spark 3. Is there a way for achieving rate limit in this situation?
Here is a sample code in spark 3:
def options: Map[String, String] = Map(
"kafka.bootstrap.servers" -> conf.getStringSeq("bootstrapServers").mkString(","),
"subscribe" -> conf.getString("topic")
) ++
Option(conf.getLong("maxOffsetsPerTrigger")).map("maxOffsetsPerTrigger" -> _.toString)
val streamingQuery = sparkSession.readStream.format("kafka").options(options)
.load
.writeStream
.trigger(Trigger.Once)
.start()
There is no other way around it to properly set a rate limit. If the maxOffsetsPerTrigger
is not applicable for streaming jobs with the Once
trigger you could do the following to achieve identical result:
Choose another trigger and use maxOffsetsPerTrigger
to limit the rate and kill this job manually after it finished processing all data.
Use options startingOffsets
and endingOffsets
while making the job a batch job. Repeat until you have processed all data within the topic. However, there is a reason why "Streaming in RunOnce mode is better than Batch" as detailed here.
Last option would be to look into the linked pull request and compile Spark on your own.