I'd need some clarification from Spring Kafka connoisseurs onto the reasoning behind the following error handling & behaviour.
My intention was to have an ExponentionalBackOff
exposed as the default CommonErrorHandler and catch essentially all possible exceptions. If an error occurs, some monitoring/alerting kicks in and I would be fixing whatever goes wrong. No DLQ configured for now, we would be blocking the queue.
@Bean
CommonErrorHandler commonErrorHandler() {
var exponentialBackOff = new ExponentialBackOff();
exponentialBackOff.setMaxInterval(Duration.ofHours(1).toMillis());
return new DefaultErrorHandler(exponentialBackOff);
}
The default consumption property based configuration:
spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.kafka.consumer.properties.spring.deserializer.key.delegate.class=org.springframework.kafka.support.serializer.DelegatingByTopicDeserializer
spring.kafka.consumer.properties.spring.kafka.key.serialization.bytopic.default=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.properties.spring.kafka.key.serialization.bytopic.config=\
topic.order.archive-invoice-event:org.apache.kafka.common.serialization.IntegerDeserializer
Listener looks like as follow:
@KafkaListener(topics = "topic.order.archive-invoice-event")
void onArchiveInvoice(ArchiveInvoiceEvent event) {
invoiceService.archiveInvoice(event.getOrderId());
}
If an exception occurs at runtime during
the consumption of an event, everything is fine, meaning the commonErrorHandler
takes over as expected.
However, I ran into an issue whereby the producer decided to change the type of the serialized key from Integer to String.
At runtime I got the following log:
Backoff FixedBackOff{interval=0, currentAttempts=1, maxAttempts=0} exhausted for topic.order.archive-invoice-event-1@489253
And stacktrace
o.a.k.c.e.SerializationException: Size of data received by IntegerDeserializer is not 4
at o.a.k.c.s.IntegerDeserializer.deserialize(IntegerDeserializer.java:30)
at o.a.k.c.s.IntegerDeserializer.deserialize(IntegerDeserializer.java:24)
at o.a.k.c.s.Deserializer.deserialize(Deserializer.java:62)
at o.s.k.s.s.DelegatingByTopicDeserializer.deserialize(DelegatingByTopicDeserializer.java:78)
at o.s.k.s.s.ErrorHandlingDeserializer.deserialize(ErrorHandlingDeserializer.java:215)
... 16 common frames omitted
Wrapped by: o.s.k.s.s.DeserializationException: failed to deserialize
at o.s.k.s.s.SerializationUtils.deserializationException(SerializationUtils.java:158)
at o.s.k.s.s.ErrorHandlingDeserializer.deserialize(ErrorHandlingDeserializer.java:218)
at o.a.k.c.s.Deserializer.deserialize(Deserializer.java:73)
at o.a.k.c.c.i.CompletedFetch.parseRecord(CompletedFetch.java:319)
I retraced the exception and the only place where a FixedBackOff policy is used is in
FailedRecordProcessor (used by AbstractMessageListenerContainer
).
This FailedRecordProcessor applies some bits of magic and overules the commonErrorHandling
in case of classified
exceptions (such as DeserializationException).
Two questions here:
Thank you for your enlightenment
Dependencies:
3.3.5
3.2.4
Why? Is that really intended or have I misconfigured anything?
As mentioned by @artem the documentation does mention some fatal exceptions which bypass the custom error handling logic.
DeserializationException
MessageConversionException
ConversionException
MethodArgumentResolutionException
NoSuchMethodException
ClassCastException
Therefore, for anyone willing to apply the same error strategy independently of the origin of the error, the default provided by Spring can be overridden as follow:
@Bean
CommonErrorHandler commonErrorHandler() {
var exponentialBackOff = new ExponentialBackOff();
exponentialBackOff.setMaxInterval(Duration.ofHours(1).toMillis());
var handler = new DefaultErrorHandler(exponentialBackOff);
// unified error handling
handler.setClassifications(Map.of(), true);
return handler;
}