I'm currently developing a Kafka Streams application. Due to internal requirements, I've disabled auto-commits by setting commit.interval.ms to a very long duration (e.g. Long.MAX). Instead, I'm using the low-level Processor API to explicitly trigger commits from the application.
I've observed that the records in the repartition topics used by my application are not being deleted and are growing indefinitely. Upon some investigation, it appears that StreamThread does not attempt to purge repartition topic records when commits are triggered manually by the user, as opposed to periodic auto-commits. I've looked at the source code, specifically around
I'm wondering if this is the intended behavior in Kafka Streams' design.
Expected Result: Regardless of whether commits are triggered by manual user requests or by automatic commits based on commit.interval.ms
configuration, repartition topic records should be purged up to the currently committed offset once repartition.purge.interval.ms
duration has passed.
Actual Result: Repartition topic records are only purged when auto-commits triggered by the commit.interval.ms
setting occur.
SomeProcessor.java
@Bean
public Function<KStream<String, String>, KStream<String, Set<String>>> sampleStream() {
return inputStream ->
inputStream
.map((key, value) -> KeyValue.pair(String.valueOf(key.hashCode()), value)) // do some key manipulation
.repartition()
.process(() -> new Processor<String, String, String, Set<String>>() {
private ProcessorContext<String, Set<String>> context;
private final Map<String, Set<String>> valueMap = new HashMap<>();
@Override
public void init(ProcessorContext<String, Set<String>> context) {
this.context = context;
context.schedule(Duration.ofMinutes(1), PunctuationType.WALL_CLOCK_TIME, this::forward);
}
@Override
public void process(Record<String, String> record) {
this.valueMap.putIfAbsent(record.key(), new HashSet<>());
this.valueMap.computeIfPresent(record.key(), (k, v) -> {
v.add(record.value());
return v;
});
}
@Override
public void close() {
this.forward(context.currentSystemTimeMs());
}
private void forward(long timestamp) {
valueMap.forEach((key, value) -> context.forward(new Record<>(key, value, timestamp)));
context.commit();
}
});
}
application.yaml
spring:
cloud:
stream:
kafka.streams:
bindings:
sampleStream-in-0:
consumer:
configuration:
commit.interval.ms: 9223372036854775807 # disables auto commit
repartition.purge.interval.ms: 300000 # 5 minutes
What you observe is not by design, but a bug. I filed a ticket for it: https://issues.apache.org/jira/browse/KAFKA-19539