apache-kafkaapache-kafka-streams

Kafka Streams application does not purge repartition topic records when manual commit


I'm currently developing a Kafka Streams application. Due to internal requirements, I've disabled auto-commits by setting commit.interval.ms to a very long duration (e.g. Long.MAX). Instead, I'm using the low-level Processor API to explicitly trigger commits from the application.

I've observed that the records in the repartition topics used by my application are not being deleted and are growing indefinitely. Upon some investigation, it appears that StreamThread does not attempt to purge repartition topic records when commits are triggered manually by the user, as opposed to periodic auto-commits. I've looked at the source code, specifically around

https://github.com/apache/kafka/blob/93adaea59990da36730b6f07675cb5ca9a54ff43/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L1809

I'm wondering if this is the intended behavior in Kafka Streams' design.


Expected Result: Regardless of whether commits are triggered by manual user requests or by automatic commits based on commit.interval.ms configuration, repartition topic records should be purged up to the currently committed offset once repartition.purge.interval.ms duration has passed.

Actual Result: Repartition topic records are only purged when auto-commits triggered by the commit.interval.ms setting occur.


Steps to Reproduce (using Spring Cloud Stream)

SomeProcessor.java

@Bean
public Function<KStream<String, String>, KStream<String, Set<String>>> sampleStream() {
    return inputStream ->
        inputStream
            .map((key, value) -> KeyValue.pair(String.valueOf(key.hashCode()), value)) // do some key manipulation
            .repartition()
            .process(() -> new Processor<String, String, String, Set<String>>() {
                private ProcessorContext<String, Set<String>> context;
                private final Map<String, Set<String>> valueMap =  new HashMap<>();

                @Override
                public void init(ProcessorContext<String, Set<String>> context) {
                    this.context = context;
                    context.schedule(Duration.ofMinutes(1), PunctuationType.WALL_CLOCK_TIME, this::forward);
                }

                @Override
                public void process(Record<String, String> record) {
                    this.valueMap.putIfAbsent(record.key(), new HashSet<>());
                    this.valueMap.computeIfPresent(record.key(), (k, v) -> {
                        v.add(record.value());
                        return v;
                    });
                }

                @Override
                public void close() {
                    this.forward(context.currentSystemTimeMs());
                }

                private void forward(long timestamp) {
                    valueMap.forEach((key, value) -> context.forward(new Record<>(key, value, timestamp)));
                    context.commit();
                }
            });
}

application.yaml

spring:
  cloud:
    stream:
      kafka.streams:
        bindings:
          sampleStream-in-0:
            consumer:
              configuration:
                commit.interval.ms: 9223372036854775807 # disables auto commit
                repartition.purge.interval.ms: 300000 # 5 minutes

Solution

  • What you observe is not by design, but a bug. I filed a ticket for it: https://issues.apache.org/jira/browse/KAFKA-19539