pysparkspark-structured-streamingwatermarkdelta-live-tablesdlt

Databricks DLT streaming with sliding window missing last window interval


I have a DLT pipeline where I want to calculate the rolling average of a column for the last 24 hours which is updated every hour.

I'm using the below code to achieve this:

@dlt.table()
def gold():
    df = dlt.read_stream("silver_table")
    
    # Define window for 24 hours with 1-hour slide
    window_spec_24h = window("fetch_ts", "24 hours", "1 hour")


    df.withWatermark("fetch_ts", "10 minutes")\
      .groupBy(df.Id, window_spec_24h)\
      .agg(avg("foo").alias("average_foo_24h"))


    return df

My issue is, I'm always missing the last window in my result df. For instance, if my input df has the following fetch_ts values:

2024-02-23T15:04:00.000
2024-02-23T16:04:00.000
2024-02-23T16:05:00.000
2024-02-23T16:54:00.000
2024-02-23T17:06:00.000
2024-02-23T18:54:00.000

the output df has the following windows:

{"start":"2024-02-22T16:00:00.000Z","end":"2024-02-23T16:00:00.000Z"}
{"start":"2024-02-22T17:00:00.000Z","end":"2024-02- 23T17:00:00.000Z"}
{"start":"2024-02-22T18:00:00.000Z","end":"2024-02-23T18:00:00.000Z"}

which means that my last row with the "2024-02-23T18:54:00.000" fetch_ts is getting excluded in the calculation.

During the next trigger, the previously missing window appears, but this time the last window of the latest batch doesn't appear. It goes on like this.

Any idea why this is happening? Or is it by design and I'm missing something? Is there a way to add the last window {"start":"2024-02-22T19:00:00.000Z","end":"2024-02-23T19:00:00.000Z"} as well so that I can include the last row in my calculation?

Thanks and regards,

(I tried removing the watermarking and then I can get the latest window as well but removing the watermarking is not an option as I want to join the calculated df with the original one to include the other columns. Stream-stream joins are not allowed without watermark.)


Solution

  • Under the hood, DLT is using Structured Streaming, so I'll explain the semantics of when windows are emitted.

    In a streaming aggregation, records may arrive late and out of order. To deal with this, SS will buffer records in time windows. At the end of each batch, the stream's output mode determines which records will be emitted downstream. However, the output mode is hidden from you in DLT and not directly configurable.

    The default is the Append output mode, which emits the aggregates that will not change in subsequent triggers. Which aggregates won't change? The aggregates that won't change are the ones that won't receive any more records; the windows that won't receive any more records are the ones whose end timestamp is less than the watermark. Since a watermark of 3pm says that the stream will no longer receive records before 3pm, the engine will close the windows that end before 3pm.

    So, in your example, that last record of 2024-02-23T18:54:00.000 is actually being processed! That record causes the watermark to advance to 10 minutes before that, 2024-02-23T18:44:00.00, which is indeed greater than the end of the last window (the one ending at 2024-02-23T18:00:00.000Z) you're seeing.

    Then, when you push new records through, you cause the watermark to advance further, which causes the buffered records to be emitted. That's the answer to:

    During the next trigger, the previously missing window appears, but this time the last window of the latest batch doesn't appear.

    If you really wanted to see windows as they are created (and not after the watermark crosses their end), your best bet would be a MV in DLT Serverless.

    Exercises

    You really don't have to do these exercises, since in DLT you're intentionally shielded from output mode. But since this was asked in [spark-structured-streaming] I'm taking liberty to add them :)

    To make these exercises easier to describe, let's just use seconds for our timestamps. We'll do a 10 second tumbling window (not sliding) with a watermark of 5 seconds.

    We receive the records 5, 6, and 9. How many aggregates are emitted downstream?

    No records are emitted. The engine receives 9 and subtracts 5 (the watermark delay), meaning that the watermark is now 4. No windows' ends are less than 4, so nothing is emitted.

    We receive 11. What records are emitted?

    Again, none. When 11 is received, the watermark updates to 11 - 5, which is 6. No windows are less than that.

    What's the minimum timestamp you need to receive for exactly one window to be emitted?

    We want the window [0, 10] to be emitted, so we need to make sure that the watermark is greater than or equal to 10. We add on the watermark delay to 10, giving us 10 + 5 = 15. Thus, a record with timestamp 15 will cause that one window to be emitted.

    Finally, let's develop an interesting comment you made: "It goes on like this." Is there ever a situation in which, using a non-zero watermark delay d and a windowed streaming aggregation, it won't "go on like this?" That is, can you ever get Structured Streaming to emit all windows from intermediary state?

    No, that's not possible. To prove it, suppose not. Suppose the maximum event time we've seen so far is m. Call the the window to which m belongs win(m), and the end of that window end(win(m)). end(win(m)) must be greater than or equal to m, by the definition of a window. Thus, for that window to be emitted, we must receive a record with timestamp greater than or equal to end(win(m)) + d, where d is the watermark delay. Recall that we constrained d to be non-zero. Thus, we have that end(win(m)) >= m and d > 0, so end(win(m)) + d > m. However, that quantity is larger than m, which we said was the largest event-time we had seen—a contradiction.