So i updated my avro with a default int attribute
{
"name": "minimum",
"type": [
"null",
"int"
],
"default": null
}
On the schema registry i have it set to forward compatibility.
My consumer exists in the same project as the producer so is using the same generated avro. while testing i noticed now the consumer is throwing the following error :
org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [org.apache.avro.generic.GenericData$Record] to [com.example.dto.CarDto] for GenericMessage [payload={....}
I have checked the config and it has the followng set:
spring.kafka.consumer.value-deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
spring.kafka.consumer.properties.specific.avro.reader=true
There are no dev tools, and im using spring boot 2.5
UPDATE: This has nothing to do with updating the avro file, i didnt notice but the exception was being suppressed. And i change i made now logs this. So my question still stands, some threads have said its because the avro class is in a different package?
my structure is:
src->main->avro->Car.avsc
src->main->java->com->example->dto->CarDto.java
src->main->java->com->example->configs->producer/consumer
src->main->java->com->example->service->ConsumerProcessor.java
my config
@Slf4j
@Configuration
public class ConsumerConfig {
@Value("${kafka.bootstrapAddress}")
private String bootstrapAddress;
@Value("${kafka.schemaRegistryUrl}")
private String schemaRegistryUrl;
// Consumers Configs
private Map<String, Object> getConsumerProps() {
final var props = new HashMap<String, Object>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "topic-a");
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
return props;
}
private <T, K> ConcurrentKafkaListenerContainerFactory<K, T> getConcurrentKafkaListenerContainerFactory(final ConsumerFactory<K, T> kafkaConsumerFactory, final KafkaTemplate<K, T> template) {
final ConcurrentKafkaListenerContainerFactory<K, T> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(kafkaConsumerFactory);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
private KafkaAvroDeserializer getAvroDeserializer() {
final var deserializerProps = new HashMap<String, Object>();
deserializerProps.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
deserializerProps.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");
final var deserializer = new KafkaAvroDeserializer();
deserializer.configure(deserializerProps, false);
return deserializer;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<Integer, com.exmaple.dto.CarDto> dataKafkaListenerContainerFactory(
final ConsumerFactory<Integer, com.example.dto.CarDto> kafkaConsumerFactory,
final KafkaTemplate<Integer, com.example.dto.CarDto> template) {
return getConcurrentKafkaListenerContainerFactory(kafkaConsumerFactory, template);
}
@Bean("kafkaListenerContainerFactory")
public ConsumerFactory<Integer, com.example.dto.CarDto> dataConsumerFactory() {
final Map<String, Object> props = getConsumerProps();
final KafkaAvroDeserializer deserializer = getAvroDeserializer();
return new DefaultKafkaConsumerFactory<>(props, new IntegerDeserializer(), new ErrorHandlingDeserializer(deserializer));
}
}
Here is my avro
{
"type": "record",
"name": “CarDto”,
"namespace": "com.example.dto",
"fields": [
{
"name": “id”,
"type": [
"null",
"int"
],
"default": null
},
{
"name": "minimum",
"type": ["null", {"type": "int", "logicalType": "date"}],
"default": null
}
]
}
my consumer
@Slf4j
@Service
public class dataConsumer implements AvroKafkaConsumer<CarDto> {
Integer counter = 0;
@Override
@KafkaListener(topics = “topic-a”, containerFactory = “dataKafkaListenerContainerFactory")
public void listen(final CarDto carDto, final Acknowledgment acknowledgment) {
acknowledgment.acknowledge();
}
@Override
public String getName() {
return “data”;
}
}
So i noticed that the containerFactory is named something different to in the consumer service. "dataKafkaListenerContainerFactory" does that matter?
while in the config its
@Bean("kafkaListenerContainerFactory")
public ConsumerFactory<Integer, com.example.dto.CarDto> dataConsumerFactory() {
SO i did some clean up and removed methods and added the following
private Map<String, Object> getProps() {
final var props = new HashMap<String, Object>();
……. existing props
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Integer.class);
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, KafkaAvroDeserializer.class);
return props;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<Integer, CarDto> myKafkaContainerFactory() {
final ConcurrentKafkaListenerContainerFactory<Integer, CarDto> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(kafkaConsumerFactory());
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
public ConsumerFactory<Integer, CarDto> kafkaConsumerFactory() {
final Map<String, Object> props = getProps();
final KafkaAvroDeserializer deserializer = new KafkaAvroDeserializer();
return new DefaultKafkaConsumerFactory<>(props, new IntegerDeserializer(), new ErrorHandlingDeserializer(deserializer));
}
edited the config
private Map<String, Object> getProps() {
final var props = new HashMap<String, Object>();
……. existing props
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Integer.class);
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, KafkaAvroDeserializer.class);
return props;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<Integer, CarDto> myKafkaContainerFactory() {
final ConcurrentKafkaListenerContainerFactory<Integer, CarDto> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(kafkaConsumerFactory());
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
public ConsumerFactory<Integer, CarDto> kafkaConsumerFactory() {
final Map<String, Object> props = getProps();
final KafkaAvroDeserializer deserializer = new KafkaAvroDeserializer();
return new DefaultKafkaConsumerFactory<>(props, new IntegerDeserializer(), new ErrorHandlingDeserializer(deserializer));
}