I have a stream of <K,V> messages. When emitting any satisfied sliding windows, I want to know the list that the window matched.
I want to avoid taking on the accumulation job myself within my .aggregate() update lambda by appending each payload value to an internal List that my aggregation state store would keep. So, every state store record would end up with its own list of contributing messages embedded. I guess it makes sense if I indeed must do this, since it is just another form of accumulation, albeit not numeric or mathematical in nature.
My understanding is that:
.groupByKey().windowedBy().aggregate() uses a WindowedKey-oriented store which is the grouping K augmented with the unique sliding window range stamps
.groupByKey().aggregate uses the grouping K
but other than the finer-grain of the WindowedKey that adds the timestamp range to distinguish the sliding windows for you, neither pipeline has any innate way to remember the messages that comprise a closed aggregation. In both cases and I need to accumulate myself as mentioned above.
Is there any better pattern to use that doesn't need this sort of internal list accumulation, when what you want are the matched messages when aggregation windows close as satisfied?
When any given message V is processed in the WindowedKey aggregation, does my updater get called possibly multiple times (once for the sliding window that V actually starts, and N more times for any other ongoing sliding windows that V is in range of)?
Is message contribution dedupe (i.e. a message can only contribute to at most one satisfied sliding window) also up to my own tracking?
Is minimum frequency matching (i.e. "find 7+ messages within 5 minutes" instead of "find 7 messages within 5 minutes") supported, since I don't see any window-end temporal triggering so you can only emit when something proactively streams through to trigger. There is no "at end of temporal window+grace, trigger a final check to emit" right?
Your approach to accumulate all input records into a list is the correct one. Kafka Streams does not automatically preserve all input records but store the "aggregate per window". In you case, the aggregate is not a "reduction" yet, but a list of all input events.
And yes, for sliding windows (same for hoping windows, ie, all "overlapping" window types), your aggregation logic may be called multiple times per input record, to update all window the input record belongs to.
Not sure if I understand question (3). But for sliding windows, a single input record can belong to multiple window instances. It's data dependent how many.
Also not sure what you mean by question (4). But maybe windowBy().emitStrategy(...)
can help (or the suppress()
operator that you can apply to the result KTable
).