javaspring-kafka

Kafka last commited offset increases even if the consumer does not process


I have a @KafkaListener function which consumes messages. The listener has the properties set in application.yml:

So i want to process one message at a time, acknowledging it if everything works fine. If i loose db connection then i should stop processing the messages and the last commited offset stops increasing as it should. For example:

After db connection up and i produce a new message, the last-committed-offset suddenly jumps to 17, but the listener function is not called to process previous 10-16 messages:

Can you help me why this can happen? I thought it will automatically run the function n-times.

@KafkaListener(topics = {TOPIC}, groupId = "mxsmart-center")
public void handle(ConsumerRecord<String, RoomMessage> record, Consumer<String, RoomMessage> consumer, Acknowledgment ack) {
    log.info("listener started" + record.value().getCreatedAt());
    TopicPartition partition = new TopicPartition(record.topic(), record.partition());
    // Fetch end offset (the next offset to be written)
    Map<TopicPartition, OffsetAndMetadata> endOffsets = consumer.committed(Collections.singleton(partition));
    log.info("Current offset: {}, Latest offset: {}", record.offset(), endOffsets);

    if (isDBUp() && isMessageValid(record.value())) {
        try {
            ack.acknowledge();
            log.info("Acknowledged");
        } catch (Exception e) {
            lastKnownState = ConnectionState.DOWN;
            log.error("Error while processing message: ", e);
            throw e;
        }
    } else {
        throw new RuntimeException("DB is not in UP state, skipping message");
    }
}

Solution

  • Solution

    1. Disable commitOnError

    Set this property explicitly to avoid committing on failures:

    yaml 
    
    spring:
      kafka:
        listener:
          ack-mode: manual_immediate
          ack-on-error: false
    

    In Java config:

    containerFactory.getContainerProperties().setAckOnError(false);

    2. Use an Error Handler That Doesn’t Commit

    Set a SeekToCurrentErrorHandler or DefaultErrorHandler that seeks the offset back instead of committing or skipping:

    @Bean
    public DefaultErrorHandler errorHandler() {
        return new DefaultErrorHandler(
            (record, exception) -> {
                // Handle error logging etc.
            },
            new FixedBackOff(0L, 0)); // immediate retry disabled
    }
    

    Ensure the error handler is set in the container factory:

    factory.setCommonErrorHandler(errorHandler());

    This prevents Spring from committing offsets when your handler throws errors (like on DB down).

    3. Set AckMode Carefully

    You're using manual_immediate which is good — it ensures offsets are committed only when ack.acknowledge() is called. So don't change this, but ensure no implicit commits happen via error handlers or rebalancing.

    Additional: Restart Behavior

    Since you use auto-offset-reset: earliest, if offsets were never committed, Kafka would start from the beginning. But once offset 17 is committed (e.g., due to rebalance), Kafka will not go back to earlier messages.