Spring boot version : 2.7.6 Spring kafka version : 2.8.11
Issue:
I was trying to handle the deserialization issues in code. To handle such issues in code, I created my own class by extending
DefaultErrorHandler
and overriding the public void handleOtherException(Exception thrownException, Consumer<?, ?> consumer, MessageListenerContainer container, boolean batchListener) {}
Sample code as below
public class CustomDefaultErrorHandler extends DefaultErrorHandler {
private static Logger log = LoggerFactory.getLogger(CustomDefaultErrorHandler.class);
@Override
public void handleOtherException(Exception thrownException, Consumer<?, ?> consumer, MessageListenerContainer container, boolean batchListener) {
manageException(thrownException, consumer);
}
private void manageException(Exception ex, Consumer<?, ?> consumer) {
log.error("Error polling message: " + ex.getMessage());
if (ex instanceof RecordDeserializationException) {
RecordDeserializationException rde = (RecordDeserializationException) ex;
consumer.seek(rde.topicPartition(), rde.offset() + 1L);
consumer.commitSync();
} else {
log.error("Exception not handled");
}
}
}
If I use the @RetryableTopic along with @KafkaListener
@RetryableTopic(listenerContainerFactory = "kafkaListenerContainerFactory", backoff = @Backoff(delay = 8000, multiplier = 2.0),
dltStrategy = DltStrategy.FAIL_ON_ERROR
, traversingCauses = "true", autoCreateTopics = "true", numPartitions = "3", replicationFactor = "3",
fixedDelayTopicStrategy = FixedDelayStrategy.MULTIPLE_TOPICS, include = {RetriableException.class, RecoverableDataAccessException.class,
SQLTransientException.class, CallNotPermittedException.class}
)
@KafkaListener(topics = "${topic.name}", groupId = "order", containerFactory = "kafkaListenerContainerFactory", id = "OTR")
public void consumeOTRMessages(ConsumerRecord<String, PayloadsVO> payload, @Header(KafkaHeaders.RECEIVED_TOPIC) String topicName) throws JsonProcessingException {
logger.info("Payload :{}", payload.value());
payloadsService.savePayload(payload.value(), pegasusTopicName);
}
What I saw in debugging the code, @RetryableTopic has its own DefaultErrorHandler configurations in
ListenerContainerFactoryConfigurer
and it stops my custom handler and deserialization process wont stop on issue.
Can you please suggest any way since I wanted to use annotations for retry process in my code
I tried to configured my own implementation of
DefaultErrorHandler
by extending it and configured in
ConcurrentKafkaListenerContainerFactory
It's quite involved, but you should be able to override the RetryTopicComponentFactory
bean and override listenerContainerFactoryConfigurer()
to return your custom error handler.
That said, deserialization exceptions will go straight to the DLT anyway.
BTW, calling commitSync()
here is worthless because there were no records returned by the poll()
.