I am having trouble understanding the concept of watermarks and allowed lateness.
Following is an excerpt from the [mail archive|https://www.mail-archive.com/user@flink.apache.org/msg08758.html] that talks about Watermarks but I have a couple of questions still. The following is the example quoted.:
Assume you have a
BoundedOutOfOrdernessTimestampExtractor
with a 2 minute bound and a 10 minute tumbling window that starts at 12:00 and ends at 12:10:If you have the following stream sequence:
12:01, A 12:04, B WM, 12:02 // 12:04 - 2 minutes 12:02, C 12:08, D 12:14, E WM, 12:12 12:16, F WM, 12:14 // 12:16 - 2 minutes 12:09, G
no allowed lateness
The window operator forwards the logical time to 12:12 when it receives
<WM, 12:12>
and evaluates the window which contains[A, B, C, D]
at this time and finally purges its state.<12:09, G>
is later ignored.allowed lateness of 3 minutes
The window operator evaluates the window when
<WM, 12:12>
is received, but its state is not purged yet. The state is purged when<WM, 12:14>
is received (window fire time 12:10 + 3mins allowed lateness).<12:09, G>
is again ignored.allowed lateness of 5 minutes
The window operator evaluates the window when
<WM, 12:12>
is received, but its state is not purged yet. When<12:09, G>
is received, the window is again evaluated but this time with[A, B, C, D, G]
and an update is sent out. The state is purged when a watermark of >= 12:15 is received.
As I understand:
My questions based on the understanding:
If you have the following stream sequence:
12:01, A 12:04, B WM, 12:02 // 12:04 - 2 minutes 12:02, C 12:01, CCC // Inserted by Sheel 12:08, D 12:14, E WM, 12:12 12:16, F WM, 12:14 // 12:16 - 2 minutes 12:09, G
This is still in the 12:00-12:10 window but behind the watermark WM, 12:02. Lets say the allowed lateness is 5 mins. Will this record be accepted "somehow" bringing in the allowed lateness into picture or would this be dropped considering the watermark 12:02 has already crossed?
The Watermarks
control the lifetime of a window but not directly whether a record is dropped or not. When Flink's WindowOperator
receives a new record, it will calculate the set of windows it falls into. If this set contains at least one active window, meaning that there is no watermark with a higher value than the window's end time + allowed lateness, the record will be assigned to this window and will become part of the window computation (even if the record's timestamp is lower than the last seen watermark). Hence, one could say that windows reduce the resolution of watermarks with respect to the individual records.
In your case, this means that both C
and CCC
will become part of the window 12:00 - 12:10
since the system hasn't seen a Watermark
with >= 12:10
, yet.