I would like to retry consuming a message on processing/deserialiation failure for a finite number of times. After the retries are exhausted, I would like to log a message, commit the offset and move ahead with consuming further messages. For this to happen I have configured the error handler bean as below.
I am using the ack mode as MANUAL_IMMEDIATE and the auto commit is disabled by enable.auto.commit: false
Offsets are committed manually/programmatically with a reference to an Acknowledgement object (ack.acknowledge())
EDIT: As per Gary's comments, I have edited the config of error handler
@Bean
public DefaultErrorHandler errorHandler(){
ConsumerRecordRecoverer recovery = (record, ex) ->{
log.error("Retries have been exhausted. Commiting offset "+record.offset())
}
// Default backoff
BackOff backoff = new FixedBackOff(3000, 5);
DefaultErrorHandler defaultErrorHandler = new DefaultErrorHandler(recovery, backoff);
defaultErrorHandler.setCommitRecovered(true);
defaultErrorHandler.setAckAfterHandle(true);
// BackOff to use when HttpServerException occurs
BackOff infiniteRetry = new FixedBackOff(FixedBackOff.DEFAULT_INTERVAL, FixedBackOff.UNLIMITED_ATTEMPTS);
BiFunction<ConsumerRecord<?,?>,Exception, BackOff> backOffFunction = (record, ex) -> {
if(ex instanceOf HttpServerException){
return infiniteRetry;
}
return null;
}
defaultErrorHandler.removeClassification(DeserializationException.class);
defaultErrorHandler.setBackOffFunction(backOffFunction)
return defaultErrorHandler;
}
Questions:
defaultErrorHandler.setCommitRecovered(true);
defaultErrorHandler.setAckAfterHandle(true);
defaultErrorHandler.removeClassification(DeserializationException.class);
Note: I have tried the above config and it seems to work. I would like to verify with the experts that this config does what is expected and does not cause any other effects.
Yes, everything is correctly configured.
Is there a way to configure a retry policy for a given exception ? For example, retry consuming a message infinitely on a Database non availability error.
See
/**
* Set a function to dynamically determine the {@link BackOff} to use, based on the
* consumer record and/or exception. If null is returned, the default BackOff will be
* used.
* @param backOffFunction the function.
* @since 2.6
*/
public void setBackOffFunction(BiFunction<ConsumerRecord<?, ?>, Exception, BackOff> backOffFunction) {
also
/**
* Set to true to reset the retry {@link BackOff} if the exception is a different type
* to the previous failure for the same record. The
* {@link #setBackOffFunction(BiFunction) backOffFunction}, if provided, will be
* called to get the {@link BackOff} to use for the new exception; otherwise, the
* configured {@link BackOff} will be used. Default true since 2.9; set to false
* to use the existing retry state, even when exceptions change.
* @param resetStateOnExceptionChange true to reset.
* @since 2.6.3
*/
public void setResetStateOnExceptionChange(boolean resetStateOnExceptionChange) {
(It is now true by default).