javaapache-kafkaapache-kafka-streamsktable

How to remove old records from a state store using a punctuator? (Kafka)


I've created a Ktable for a topic using streamsBuilder.table("myTopic"), which I materialize to a state store so that I can use interactive queries.

Every hour, I want to remove records from this state store (and associated changelog topic) whose values haven't been updated in the past hour.

I believe this may be possible using a punctuator, but I've only used the DSL so far, and so am not sure of how to proceed. I would be very grateful if somebody could provide me with an example.

Thanks,

Jack


Solution

  • It is possible to mix and match the Processor API with the DSL, but you can't process a KTable. You would need to convert to a KStream. Alternatively you could create a new topology with a Processor that interacts with the state store.

    You will need to store that state somewhere - how to determine if records are older than one hour. One option could be to add a timestamp to each record in the state store.

    In the init method of a Processor you could call schedule (punctuate) to iterate records in the state store and remove old ones:

    context.schedule(Duration.ofMillis(everyHourInMillis), PunctuationType.WALL_CLOCK_TIME, timestamp -> {
        myStateStore.all().forEachRemaining(keyValue -> {
            if (Instant.ofEpochMilli(valueInStateStore).compareTo(olderThanAnHour) < 0) {
                myStateStore.delete(keyValue.key);
            }
        });
    });