I tried to read data from Kafka with spark stream APIs and write result to S3 as delta table. it is important to me put fewer objects on S3 so I used coalesce(2) to create two objects in every batch. Also I want to run the job every 3 hours.
df = (spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", cf.kafka_bootstrap_servers)
.option("subscribe", cf.topic_name)
.option("startingOffsets", cf.starting_offsets)
.option("maxOffsetsPerTrigger", cf.max_batch)
.option("minOffsetsPerTrigger", cf.min_batch)
.option("failOnDataLoss", cf.fail_on_data_loss)
.option("maxTriggerDelay", cf.max_trigger_delay)
.load()
)
### some transformation here ###
df = df.coalesce(2)
query = df.writeStream \
.format("delta") \
.partitionBy("created_date") \
.outputMode("append") \
.option("mergeSchema", "true") \
.option("checkpointLocation", cf.checkpoints_location) \
.trigger(availableNow=True) \
.start(cf.sink_location)
query.awaitTermination()
as you see in the code I set minOffsetsPerTrigger and maxOffsetsPerTrigger and I used AvailableNow Trigger in query.
based on documents:
Available-now micro-batch Similar to queries one-time micro-batch trigger, the query will process all the available data and then stop on its own. The difference is that, it will process the data in (possibly) multiple micro-batches based on the source options (e.g. maxFilesPerTrigger for file source), which will result in better query scalability.
The maxBytesPerTrigger property determines the maximum size of each micro-batch. For AvailableNow, it doesn’t represent just one trigger; it can indeed involve multiple triggers if there’s more data to process.
It started to consume data from Kafka in micro batches but it just stored the last batch in s3, so after it finished I could see that the last batch of data and previous batches were missing.
update:
seems it does not create objects until it is finished, No objects are created on the S3 during the processing batches.
I found the problem. it was not related to the trigger. In transformations, there is a from_avro function for deserializing Kafka messages. unfortunately, there was a schema change and from_avro could not parse old messages.