javaspring-bootspring-kafkaspring-retry

Configure default error handler to commit messages after recovery


I would like to retry consuming a message on processing/deserialiation failure for a finite number of times. After the retries are exhausted, I would like to log a message, commit the offset and move ahead with consuming further messages. For this to happen I have configured the error handler bean as below.

I am using the ack mode as MANUAL_IMMEDIATE and the auto commit is disabled by enable.auto.commit: false Offsets are committed manually/programmatically with a reference to an Acknowledgement object (ack.acknowledge())

EDIT: As per Gary's comments, I have edited the config of error handler

@Bean
public DefaultErrorHandler errorHandler(){

   ConsumerRecordRecoverer recovery = (record, ex) ->{
   
      log.error("Retries have been exhausted. Commiting offset "+record.offset())
    }
  
   // Default backoff
   BackOff backoff = new FixedBackOff(3000, 5);
   DefaultErrorHandler defaultErrorHandler = new DefaultErrorHandler(recovery, backoff);
   defaultErrorHandler.setCommitRecovered(true);
   defaultErrorHandler.setAckAfterHandle(true);

  // BackOff to use when HttpServerException occurs
     BackOff infiniteRetry = new FixedBackOff(FixedBackOff.DEFAULT_INTERVAL, FixedBackOff.UNLIMITED_ATTEMPTS);

  BiFunction<ConsumerRecord<?,?>,Exception, BackOff> backOffFunction = (record, ex) -> {
  
  if(ex instanceOf HttpServerException){
      return infiniteRetry;
     }
    return null;
  }
   defaultErrorHandler.removeClassification(DeserializationException.class);
   defaultErrorHandler.setBackOffFunction(backOffFunction)
   return defaultErrorHandler;
}  

Questions:

  1. In the case when retries(5) have been exhausted, the idea is to log a message and commit the offset and move forward with consuming other messages. For this to happen, I have configured the below on the error handler. Please confirm if this is enough?
defaultErrorHandler.setCommitRecovered(true);
defaultErrorHandler.setAckAfterHandle(true);
  1. Since the DesrializationExeption is part of the fatal list, it's not going to be retried. Therefore I configured the error handler to remove it this way for a retry to occur as configured, when a Deserialization error occurs. Please confirm if this is correct?

defaultErrorHandler.removeClassification(DeserializationException.class);

  1. Is there a way to configure a retry policy for a given exception ? For example, retry consuming a message infinitely on a Database non availability error. This was possible in earlier versions of Springboot/Spring-kafka, as the retry policy can be set on the Listener Container. However, I could not figure this in the latest version. Please help me with any sample if its possible.

Note: I have tried the above config and it seems to work. I would like to verify with the experts that this config does what is expected and does not cause any other effects.


Solution

  • Yes, everything is correctly configured.

    Is there a way to configure a retry policy for a given exception ? For example, retry consuming a message infinitely on a Database non availability error.

    See

    /**
     * Set a function to dynamically determine the {@link BackOff} to use, based on the
     * consumer record and/or exception. If null is returned, the default BackOff will be
     * used.
     * @param backOffFunction the function.
     * @since 2.6
     */
    public void setBackOffFunction(BiFunction<ConsumerRecord<?, ?>, Exception, BackOff> backOffFunction) {
    

    also

    /**
     * Set to true to reset the retry {@link BackOff} if the exception is a different type
     * to the previous failure for the same record. The
     * {@link #setBackOffFunction(BiFunction) backOffFunction}, if provided, will be
     * called to get the {@link BackOff} to use for the new exception; otherwise, the
     * configured {@link BackOff} will be used. Default true since 2.9; set to false
     * to use the existing retry state, even when exceptions change.
     * @param resetStateOnExceptionChange true to reset.
     * @since 2.6.3
     */
    public void setResetStateOnExceptionChange(boolean resetStateOnExceptionChange) {
    

    (It is now true by default).