Basically I have this
streaming_df = (
self.spark.readStream.format("delta")
.option("withEventTimeOrder", "true")
.option("skipChangeCommits", "true")
.option(
"badRecordsPath",
f"{self.badrecords_path}/",
)
.table(f"{self.catalog_name}.bronze.measurements{self.postfix}")
.withWatermark("startTimestamp", "72 hours")
.filter((f.col("quality") == "Good") & (f.col("value").isNotNull()))
.dropDuplicates(["startTimestamp", "id"])
----other transformations
As you could see, my watermark condition is for 72 hours.So, I would expect the latest records from today i.e 2023-07-18, not to be there in the destination (the streaming df is written to a destination delta lake table).
in case interested, this is how the destination table is being written to
ws = (
streaming_df.writeStream.queryName("meas_silver_smuk")
.foreachBatch(
lambda df, epoch_id: process_func(
df=df,
epoch_id=epoch_id,
)
)
.option(
"checkpointLocation", f"{checkpoint_path}"
)
.trigger(**trigger_options)
.start()
)
ws.awaitTermination()
But when I query the destination, it does have data for 2023-07-18. This is the query results of the destination:
Why is 2023-07-18 there?
As I read the documentation , the watermark, should somehow be
Max starttimestamp noticed - Watermark period > End Period of the time window.
And my time window is a processing which is done every 10 mins.
Can someone please assist in understanding? Does it still write to destination even though the watermark period is still valid and just stops the operation at the end of the watermark?
the stream writes to the destination in an append mode.
and as I see the documentation , it should be written at the end of the watermark threshold:
I could also use dropDuplicatesWithinWatermark and specify the Id column, but I need to get my understanding of what is happening here, first.
Any questions, and I can help answer.
I think what you are mentioning regarding time when output will be written is true only if you use watermark with aggregates. For dropDuplicates there is not reason to wait 72 hours. You immediately write unique entry and will drop all later duplicates (withing 72 hours). You will also drop the state for entries older than 72 hours from state. When you use aggregates it must wait this 72 hours as all data within it may influence aggregated value. In documentation for dropping duplicates with watermark they don't mention this wait until watermark finish to write data they just say they drop duplicates and will check duplicates inside watermark interval.