I have a streaming executing in the default trigger. My goal is to limit the volume read in each of these executions to avoid a huge micro batch. Sometimes my Spark Jobs stop the whole weekend, so when I restart them, they take so long to finish the first one. I also persist the Dataframes 'cause this is written in 2 databases. Two approaches were tested.
Official docs says that maxOffsetsPerTrigger limit the number of offsets processed per trigger interval, but that did not work for me. Did I misunderstood the meaning of this param?
val read = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafkaServers)
.option("subscribe", kafkaTopic)
.option("startingOffsets", "latest")
.option("failOnDataLoss", "false")
.option("maxOffsetsPerTrigger", "1")
.load()
Also, I read this answer, but I do not know where and how to set max.poll.records correctly. I tried in the option of the readStream with no success. Code below:
val read = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafkaServers)
.option("subscribe", kafkaTopic)
.option("startingOffsets", "latest")
.option("failOnDataLoss", "false")
.option("max.poll.records", "1")
.load()
Main function:
override def execute(spark: SparkSession, args: Array[String]): Unit = {
val basePath: String = args(0)
val kafkaServers: String = args(1)
val kafkaTopic: String = args(2)
val checkpoint: String = args(3)
val read = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafkaServers)
.option("subscribe", kafkaTopic)
.option("startingOffsets", "latest")
.option("failOnDataLoss", "false")
.option("maxOffsetsPerTrigger", "1")
.load()
val transformed = read
.transform(applySchema)
.transform(conversions)
.transform(dropDuplicates)
.transform(partitioning)
val sink = new FileSystemSink(basePath)
val query = transformed
.writeStream
.outputMode(OutputMode.Append)
.foreachBatch(sink.writeOnS3 _)
.option("checkpointLocation", f"$basePath/checkpoints/$checkpoint")
.start()
query.awaitTermination()
}
Besides the questions above, what is the correct way to limit offsets?
Spark Version: 2.4.5.
I tested again and maxOffsetsPerTrigger worked just fine. I misunderstood the result of the trigger and now it makes sense. The param means the total offsets read, not the offsets per partition.