We are using batch listeners from spring-kafka with JSON payloads that we parse to Java dtos automatically, so our listener methods currently look something like this:
@KafkaListener(topics = MY_TOPIC, containerFactory = "kafkaListenerContainerFactoryWithBatching",
properties = {"max.poll.records=500"})
public void failingBatchReceiver(@Payload List<Dto> messages) {
// ...
}
Our dtos are annotated with JSR 303 bean validation annotations and we want to log and ignore any invalid message.
For record listeners we can put @Valid
on the @Payload
, but here this would mean we reject the whole batch if it contains one invalid message.
Question 1: Is there a simpler way than to inject a Validator
, validate every message, log the error and ignore the message for the processing in every listener?
Another issue is conversion failures: If we pass in non-valid JSON (e.g. a date in unexpected formatting) spring-kafka will pass in a null
message without logging anything. This was unexpected for us. We now realized that the original exception is available in the CONVERSION_FAILURES
header. So again, in every of our listeneres we need to inject this header. If we notice a null
payload we check the index of the header list and then we can log the error. For investigation we also want to log the topic, partition and offset of the message, so we must also inject those headers into every listener.
Question 2: Is there a simpler way to make these conversion failures visible?
Question 3: Is it possible to inject all headers in one argument instead of using one argument per header like shown on https://docs.spring.io/spring-kafka/reference/kafka/receiving-messages/listener-annotation.html#batch-listeners like we can for record listeners with ConsumerRecordMetadata
?
This would make it simpler to pass the payloads and headers to some helper method that we can use to perform all these checks, log errors and remove invalid messages.
Question 1: Is there a simpler way than to inject a Validator, validate every message, log the error and ignore the message for the processing in every listener?
Still could not find any.
Question 2: Is there a simpler way to make these conversion failures visible?
Created https://github.com/spring-projects/spring-kafka/issues/3555, but we will probably just handle the errors in the batch handler (extracted to a common helper).
Question 3: Is it possible to inject all headers in one argument instead of using one argument per header like shown on https://docs.spring.io/spring-kafka/reference/kafka/receiving-messages/listener-annotation.html#batch-listeners like we can for record listeners with ConsumerRecordMetadata? This would make it simpler to pass the payloads and headers to some helper method that we can use to perform all these checks, log errors and remove invalid messages.
Yes, we can inject @Headers Map<String, Object> headers
with batch listeners as well.