javaapache-kafkaspring-kafkaavrospring-messaging

Kafka throwing error after updating Avro spring boot


So i updated my avro with a default int attribute

{
  "name": "minimum",
  "type": [
    "null",
    "int"
  ],
  "default": null
}

On the schema registry i have it set to forward compatibility.

My consumer exists in the same project as the producer so is using the same generated avro. while testing i noticed now the consumer is throwing the following error :

org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [org.apache.avro.generic.GenericData$Record] to [com.example.dto.CarDto] for GenericMessage [payload={....}

I have checked the config and it has the followng set:

spring.kafka.consumer.value-deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
spring.kafka.consumer.properties.specific.avro.reader=true

There are no dev tools, and im using spring boot 2.5

UPDATE: This has nothing to do with updating the avro file, i didnt notice but the exception was being suppressed. And i change i made now logs this. So my question still stands, some threads have said its because the avro class is in a different package?

my structure is:

src->main->avro->Car.avsc

src->main->java->com->example->dto->CarDto.java

src->main->java->com->example->configs->producer/consumer

src->main->java->com->example->service->ConsumerProcessor.java

my config

    @Slf4j
@Configuration
public class ConsumerConfig {

    @Value("${kafka.bootstrapAddress}")
    private String bootstrapAddress;


    @Value("${kafka.schemaRegistryUrl}")
    private String schemaRegistryUrl;

    // Consumers Configs

    private Map<String, Object> getConsumerProps() {
        final var props = new HashMap<String, Object>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "topic-a");
        props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

        return props;
    }

    private <T, K> ConcurrentKafkaListenerContainerFactory<K, T> getConcurrentKafkaListenerContainerFactory(final ConsumerFactory<K, T> kafkaConsumerFactory, final KafkaTemplate<K, T> template) {
        final ConcurrentKafkaListenerContainerFactory<K, T> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(kafkaConsumerFactory);

        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        return factory;
    }

    private KafkaAvroDeserializer getAvroDeserializer() {
        final var deserializerProps = new HashMap<String, Object>();
        deserializerProps.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
        deserializerProps.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");
        final var deserializer = new KafkaAvroDeserializer();
        deserializer.configure(deserializerProps, false);
        return deserializer;
    }


    @Bean
    public ConcurrentKafkaListenerContainerFactory<Integer, com.exmaple.dto.CarDto> dataKafkaListenerContainerFactory(
            final ConsumerFactory<Integer, com.example.dto.CarDto> kafkaConsumerFactory,
            final KafkaTemplate<Integer, com.example.dto.CarDto> template) {
        return getConcurrentKafkaListenerContainerFactory(kafkaConsumerFactory, template);
    }

    @Bean("kafkaListenerContainerFactory")
    public ConsumerFactory<Integer, com.example.dto.CarDto> dataConsumerFactory() {
        final Map<String, Object> props = getConsumerProps();
        final KafkaAvroDeserializer deserializer = getAvroDeserializer();

        return new DefaultKafkaConsumerFactory<>(props, new IntegerDeserializer(), new ErrorHandlingDeserializer(deserializer));
    }
}

Here is my avro

{
  "type": "record",
  "name": “CarDto”,
  "namespace": "com.example.dto",
  "fields": [
    {
      "name": “id”,
      "type": [
        "null",
        "int"
      ],
      "default": null
    },   
       {
      "name": "minimum",
      "type": ["null", {"type": "int", "logicalType": "date"}],
      "default": null
    }
  ]
}

my consumer

@Slf4j
@Service
public class dataConsumer implements AvroKafkaConsumer<CarDto> {

    Integer counter = 0;

    @Override
    @KafkaListener(topics = “topic-a”, containerFactory = “dataKafkaListenerContainerFactory")
    public void listen(final CarDto carDto, final Acknowledgment acknowledgment) {

        acknowledgment.acknowledge();
    }

    @Override
    public String getName() {
        return “data”;
    }
}

So i noticed that the containerFactory is named something different to in the consumer service. "dataKafkaListenerContainerFactory" does that matter?

while in the config its

@Bean("kafkaListenerContainerFactory")
    public ConsumerFactory<Integer, com.example.dto.CarDto> dataConsumerFactory() {

SO i did some clean up and removed methods and added the following

 private Map<String, Object> getProps() {
     final var props = new HashMap<String, Object>();
 ……. existing props

     props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Integer.class);
     props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
     props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
     props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, KafkaAvroDeserializer.class);


     return props;
 }

 @Bean
 public ConcurrentKafkaListenerContainerFactory<Integer, CarDto> myKafkaContainerFactory() {
     final ConcurrentKafkaListenerContainerFactory<Integer, CarDto> factory = new ConcurrentKafkaListenerContainerFactory<>();
     factory.setConsumerFactory(kafkaConsumerFactory());
     factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
     return factory;
 }

 public ConsumerFactory<Integer, CarDto> kafkaConsumerFactory() {
     final Map<String, Object> props = getProps();
     final KafkaAvroDeserializer deserializer = new KafkaAvroDeserializer();

     return new DefaultKafkaConsumerFactory<>(props, new IntegerDeserializer(), new ErrorHandlingDeserializer(deserializer));
 }

Solution

  • edited the config

     private Map<String, Object> getProps() {
         final var props = new HashMap<String, Object>();
     ……. existing props
    
         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Integer.class);
         props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
         props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, KafkaAvroDeserializer.class);
    
    
         return props;
     }
    
     @Bean
     public ConcurrentKafkaListenerContainerFactory<Integer, CarDto> myKafkaContainerFactory() {
         final ConcurrentKafkaListenerContainerFactory<Integer, CarDto> factory = new ConcurrentKafkaListenerContainerFactory<>();
         factory.setConsumerFactory(kafkaConsumerFactory());
         factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
         return factory;
     }
    
     public ConsumerFactory<Integer, CarDto> kafkaConsumerFactory() {
         final Map<String, Object> props = getProps();
         final KafkaAvroDeserializer deserializer = new KafkaAvroDeserializer();
    
         return new DefaultKafkaConsumerFactory<>(props, new IntegerDeserializer(), new ErrorHandlingDeserializer(deserializer));
     }