I am trying to understand whether I can achieve the following behavior with Kafka Streams, with grace configuration or in any other way:
messageTime
fieldmessageTime
field.10:00-10:01
, 10:01-10:02
, 10:02-10:03
messageTime
value 10:00:00
arrives at 10:02:15 I want it to
go into 10:00-10:01
windowmessageTime
value 10:01:00
arrives at 10:02:15 I want it to go into 10:01-10:02
windowmessageTime
value 10:02:00
arrives at 10:02:14 I want it to go into 10:02-10:03
windowmessageTime
value 10:00:00
arrives at 10:03:01 I want it to be discarded (since beyond 1 min window plus 2 mins grace)10:00-10:01
to be closed and flushed at 10:03 (1 min + 2 mins grace)10:01-10:02
to be closed and flushed at 10:04 (1 min + 2 mins grace)10:02-10:03
to be closed and flushed at 10:05 (1 min + 2 mins grace)Yes, you would use TimeWindows.ofSizeAndGrace(Duration.ofMinutes(1L), Duration.ofMinutes(2))
to specify the window.
Time is taken from a field in the message, lets call it messageTime field
You can write a custom TimestampeExtractor
to use messageTime
as timestamp, and pass it via StreamsConfig
.
If you only want to get a single final result when a window is closed, you can use either suppress()
(on the KTable
result) or windowedBy(...).emitStrategy(...).aggregate(...)
. (Note that window are only closed when "stream time" advances beyond their close time, so if input data stops flowing, time stops advancing and windows are kept open.)