I don't understand how it should be configured to validate message in ErrorHandlingDeserializer.
spring:
json:
use:
type:
headers: false
kafka:
consumer:
key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
properties:
spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
spring.deserializer.validator.class: org.springframework.validation.beanvalidation.LocalValidatorFactoryBean
But LocalValidatorFactoryBean doesn't validate message with jakarta.validation.constraints.
How to config it with ErrorHandlingDeserializer?
Or it has only one way to add validator?
@Configuration
@RequiredArgsConstructor
public class KafkaListenerConfig implements KafkaListenerConfigurer {
private final LocalValidatorFactoryBean validator;
@Override
public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
registrar.setValidator(this.validator);
}
}
Simple Bean validation (adding Validator to registrar) works, but only with simple consuming
@KafkaListener(id = "businessListener", topics = "${app.topic}", batch = "true", idIsGroup = false)
void listen(List<ConsumerRecord<String, @Valid Business>> business) {
// not validated
}
@KafkaListener(id = "businessListener", topics = "${app.topic}", idIsGroup = false)
void listen(@Valid Business business) {
// validated
}
Finaly resolved by adding own Validator with property spring.deserializer.validator.class
public class DeserializerValidator extends SpringValidatorAdapter {
public DeserializerValidator() {
super(Validation.buildDefaultValidatorFactory().getValidator());
}
}