In my spring boot consumer using kafka I have configured DefaultErrorHandler
to retry failed event a couple of times.
Now, the issue is that after each retry I am seeing a stackTrace of the exception which ideally should not be logged (or if anything I am missing please point out).
My factory code :
public <K, V> ConcurrentKafkaListenerContainerFactory<K, V> createDlqContainerFactory(
ConsumerFactory<K, V> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<K, V> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
FixedBackOff fixedBackOff = new FixedBackOff(dlqRetryInterval, dlqMaxAttempts);
ConsumerRecordRecoverer recovery = (record, ex) -> {
log.error("Final retry failed for DLQ record: topic={}, partition={}, offset={}",
record.topic(), record.partition(), record.offset(), ex);
};
factory.setCommonErrorHandler(new DefaultErrorHandler(recovery, fixedBackOff));
return factory;
}
Ideally I would want only the Final retry error log, but instead I am getting a stacktrace after each retry
2024-12-25 11:31:57,003 ERROR[org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] , KafkaMessageListenerContainer - Error handler threw an exception
org.springframework.kafka.KafkaException: Seek to current after exception
at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:227)
at org.springframework.kafka.listener.DefaultErrorHandler.handleRemaining(DefaultErrorHandler.java:168)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:2836)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.lambda$doInvokeRecordListener$53(KafkaMessageListenerContainer.java:2713)
at io.micrometer.observation.Observation.observe(Observation.java:565)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2699)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2541)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2430)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:2085)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1461)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1426)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1296)
at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804)
at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.transaction.processing.service.consumer.StockEventConsumer.consume(com.transaction.processing.service.model.eventmetadata.EventMetaData)' threw exception
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2869)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2814)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2778)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.lambda$doInvokeRecordListener$53(KafkaMessageListenerContainer.java:2701)
... 10 common frames omitted
Suppressed: org.springframework.kafka.listener.ListenerExecutionFailedException: Restored Stack Trace
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:435)
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invoke(MessagingMessageListenerAdapter.java:384)
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:85)
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:50)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2800)
Caused by: java.lang.RuntimeException: My Exception Message
spring-kafka
: 3.2.4
Consumer config:
@Bean
public ConsumerFactory<String, EventMetaData> stockConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(EventMetaData.class));
}
@Bean(name = "commonKafkaTemplate")
public KafkaTemplate<String, EventMetaData> kafkaTemplate(
@Value("${kafka.common.brokers}") String brokers) {
Map<String, Object> producerProps = new HashMap<>();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
ProducerFactory<String, EventMetaData> producerFactory = new DefaultKafkaProducerFactory<>(producerProps);
return new KafkaTemplate<>(producerFactory);
}
Is there any solution so that I am able to handle this
Looks like your concern has been mitigated in the latest Spring for Apache Kafka 3.3.0
: https://github.com/spring-projects/spring-kafka/issues/3409.
So, there is now a new RecordInRetryException
and no extra logs in between retries as you would expect.
Since this is a breaking change in the behavior we did not back-port the fix into 3.2.x
.