I've created a Ktable
for a topic using streamsBuilder.table("myTopic")
, which I materialize to a state store so that I can use interactive queries.
Every hour, I want to remove records from this state store (and associated changelog topic) whose values haven't been updated in the past hour.
I believe this may be possible using a punctuator, but I've only used the DSL so far, and so am not sure of how to proceed. I would be very grateful if somebody could provide me with an example.
Thanks,
Jack
It is possible to mix and match the Processor API with the DSL, but you can't process a KTable. You would need to convert to a KStream. Alternatively you could create a new topology with a Processor that interacts with the state store.
You will need to store that state somewhere - how to determine if records are older than one hour. One option could be to add a timestamp to each record in the state store.
In the init method of a Processor you could call schedule (punctuate) to iterate records in the state store and remove old ones:
context.schedule(Duration.ofMillis(everyHourInMillis), PunctuationType.WALL_CLOCK_TIME, timestamp -> {
myStateStore.all().forEachRemaining(keyValue -> {
if (Instant.ofEpochMilli(valueInStateStore).compareTo(olderThanAnHour) < 0) {
myStateStore.delete(keyValue.key);
}
});
});