I have a @KafkaListener function which consumes messages. The listener has the properties set in application.yml:
So i want to process one message at a time, acknowledging it if everything works fine. If i loose db connection then i should stop processing the messages and the last commited offset stops increasing as it should. For example:
After db connection up and i produce a new message, the last-committed-offset suddenly jumps to 17, but the listener function is not called to process previous 10-16 messages:
Can you help me why this can happen? I thought it will automatically run the function n-times.
@KafkaListener(topics = {TOPIC}, groupId = "mxsmart-center")
public void handle(ConsumerRecord<String, RoomMessage> record, Consumer<String, RoomMessage> consumer, Acknowledgment ack) {
log.info("listener started" + record.value().getCreatedAt());
TopicPartition partition = new TopicPartition(record.topic(), record.partition());
// Fetch end offset (the next offset to be written)
Map<TopicPartition, OffsetAndMetadata> endOffsets = consumer.committed(Collections.singleton(partition));
log.info("Current offset: {}, Latest offset: {}", record.offset(), endOffsets);
if (isDBUp() && isMessageValid(record.value())) {
try {
ack.acknowledge();
log.info("Acknowledged");
} catch (Exception e) {
lastKnownState = ConnectionState.DOWN;
log.error("Error while processing message: ", e);
throw e;
}
} else {
throw new RuntimeException("DB is not in UP state, skipping message");
}
}
commitOnError
Set this property explicitly to avoid committing on failures:
yaml
spring:
kafka:
listener:
ack-mode: manual_immediate
ack-on-error: false
In Java config:
containerFactory.getContainerProperties().setAckOnError(false);
Set a SeekToCurrentErrorHandler
or DefaultErrorHandler
that seeks the offset back instead of committing or skipping:
@Bean
public DefaultErrorHandler errorHandler() {
return new DefaultErrorHandler(
(record, exception) -> {
// Handle error logging etc.
},
new FixedBackOff(0L, 0)); // immediate retry disabled
}
Ensure the error handler is set in the container factory:
factory.setCommonErrorHandler(errorHandler());
This prevents Spring from committing offsets when your handler throws errors (like on DB down).
AckMode
CarefullyYou're using manual_immediate
which is good — it ensures offsets are committed only when ack.acknowledge()
is called. So don't change this, but ensure no implicit commits happen via error handlers or rebalancing.
Since you use auto-offset-reset: earliest
, if offsets were never committed, Kafka would start from the beginning. But once offset 17 is committed (e.g., due to rebalance), Kafka will not go back to earlier messages.