javaapache-kafkaspring-kafkaavroconfluent-schema-registry

KafkaListener got "Caused by: org.springframework.messaging.converter.MessageConversionException" when deserializing to Avro POJO class


I am building a Kafka Listener / Consumer with Spring Boot that consumes Avro data from a topic.

Here's some of the properties

spring.kafka.properties.specific.avro.reader=true
spring.kafka.consumer.properties.acks=all
spring.kafka.consumer.properties.auto.offset.reset=latest
spring.kafka.properties.schema.registry.url=http://10.0.99.111:8081
spring.kafka.properties.schema.registry.ssl.truststore.location=/sr.truststore.jks
spring.kafka.properties.schema.registry.ssl.truststore.password=password
spring.kafka.properties.auto.register.schemas=false

This is the listener function:

@KafkaListener(topics = "TOPIC", groupId = "GROUP_ID"
            , properties = {
                ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG + ":org.apache.kafka.common.serialization.StringDeserializer",
                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG + ":io.confluent.kafka.serializers.KafkaAvroDeserializer"
            })
public void listen(Value message) throws Exception {
    logger.debug("Consumed from kafka {}", message.toString());
}

The error:

2024-05-06T17:15:23.232+07:00 [APP/PROC/WEB/0] [OUT] Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [[B] to [com.mykafkalistener.Value] for GenericMessage [payload=byte[124], headers={kafka_offset=95, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@2b056e41, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedMessageKey=18, kafka_receivedTopic=TOPIC, kafka_receivedTimestamp=1714990522973, kafka_groupId=GROUP_ID}], failedMessage=GenericMessage [payload=byte[124], headers={kafka_offset=95, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@2b056e41, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedMessageKey=18, kafka_receivedTopic=TOPIC, kafka_receivedTimestamp=1714990522973, kafka_groupId=GROUP_ID}]
2024-05-06T17:15:23.232+07:00 [APP/PROC/WEB/0] [OUT] at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:340)
2024-05-06T17:15:23.232+07:00 [APP/PROC/WEB/0] [OUT] at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:87)
2024-05-06T17:15:23.232+07:00 [APP/PROC/WEB/0] [OUT] at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:52)
2024-05-06T17:15:23.232+07:00 [APP/PROC/WEB/0] [OUT] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2044)
2024-05-06T17:15:23.232+07:00 [APP/PROC/WEB/0] [OUT] ... 10 common frames omitted
2024-05-06T17:15:23.232+07:00 [APP/PROC/WEB/0] [OUT] Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [[B] to [com.mykafkalistener.Value] for GenericMessage [payload=byte[124], headers={kafka_offset=95, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@2b056e41, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedMessageKey=18, kafka_receivedTopic=TOPIC, kafka_receivedTimestamp=1714990522973, kafka_groupId=GROUP_ID}]
2024-05-06T17:15:23.232+07:00 [APP/PROC/WEB/0] [OUT] at org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver.resolveArgument(PayloadMethodArgumentResolver.java:145)
2024-05-06T17:15:23.232+07:00 [APP/PROC/WEB/0] [OUT] at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor$KafkaNullAwarePayloadArgumentResolver.resolveArgument(KafkaListenerAnnotationBeanPostProcessor.java:910)
2024-05-06T17:15:23.232+07:00 [APP/PROC/WEB/0] [OUT] at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:117)
2024-05-06T17:15:23.232+07:00 [APP/PROC/WEB/0] [OUT] at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:148)
2024-05-06T17:15:23.232+07:00 [APP/PROC/WEB/0] [OUT] at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:116)
2024-05-06T17:15:23.232+07:00 [APP/PROC/WEB/0] [OUT] at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48)
2024-05-06T17:15:23.232+07:00 [APP/PROC/WEB/0] [OUT] at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:329)
2024-05-06T17:15:23.232+07:00 [APP/PROC/WEB/0] [OUT] ... 13 common frames omitted

When I changed the function to listen(ConsumerRecord<String, Value> message), The error changed to this and skipping this offset afterwards :

2024-05-06T17:15:23.232+07:00 [APP/PROC/WEB/0] [OUT] Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [[B] to [org.apache.kafka.clients.consumer.ConsumerRecords] for GenericMessage [payload=byte[124], headers={kafka_offset=95, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@2b056e41, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedMessageKey=18, kafka_receivedTopic=TOPIC, kafka_receivedTimestamp=1714990522973, kafka_groupId=GROUP_ID}], failedMessage=GenericMessage [payload=byte[124], headers={kafka_offset=95, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@2b056e41, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedMessageKey=18, kafka_receivedTopic=TOPIC, kafka_receivedTimestamp=1714990522973, kafka_groupId=GROUP_ID}]
2024-05-06T17:15:23.232+07:00 [APP/PROC/WEB/0] [OUT] at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:340)
2024-05-06T17:15:23.232+07:00 [APP/PROC/WEB/0] [OUT] at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:87)
2024-05-06T17:15:23.232+07:00 [APP/PROC/WEB/0] [OUT] at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:52)
2024-05-06T17:15:23.232+07:00 [APP/PROC/WEB/0] [OUT] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2044)
2024-05-06T17:15:23.232+07:00 [APP/PROC/WEB/0] [OUT] ... 10 common frames omitted
2024-05-06T17:15:23.232+07:00 [APP/PROC/WEB/0] [OUT] Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [[B] to [org.apache.kafka.clients.consumer.ConsumerRecords] for GenericMessage [payload=byte[124], headers={kafka_offset=95, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@2b056e41, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedMessageKey=18, kafka_receivedTopic=TOPIC, kafka_receivedTimestamp=1714990522973, kafka_groupId=GROUP_ID}]
2024-05-06T17:15:23.232+07:00 [APP/PROC/WEB/0] [OUT] at org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver.resolveArgument(PayloadMethodArgumentResolver.java:145)
2024-05-06T17:15:23.232+07:00 [APP/PROC/WEB/0] [OUT] at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor$KafkaNullAwarePayloadArgumentResolver.resolveArgument(KafkaListenerAnnotationBeanPostProcessor.java:910)
2024-05-06T17:15:23.232+07:00 [APP/PROC/WEB/0] [OUT] at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:117)
2024-05-06T17:15:23.232+07:00 [APP/PROC/WEB/0] [OUT] at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:148)
2024-05-06T17:15:23.232+07:00 [APP/PROC/WEB/0] [OUT] at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:116)
2024-05-06T17:15:23.232+07:00 [APP/PROC/WEB/0] [OUT] at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48)
2024-05-06T17:15:23.232+07:00 [APP/PROC/WEB/0] [OUT] at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:329)
2024-05-06T17:15:23.232+07:00 [APP/PROC/WEB/0] [OUT] ... 13 common frames omitted

The POJO class is auto-generated:

<build>
        <plugins>
            <plugin>
                <groupId>org.apache.avro</groupId>
                <artifactId>avro-maven-plugin</artifactId>
                <version>${avro.version}</version>
                <executions>
                    <execution>
                        <phase>generate-sources</phase>
                        <goals>
                            <goal>schema</goal>
                        </goals>
                        <configuration>
                            <sourceDirectory>${project.basedir}/src/main/resources/avro</sourceDirectory>
                            <outputDirectory>${project.basedir}/src/main/java/entity/kafka</outputDirectory>
                            <stringType>String</stringType>
                            <enableDecimalLogicalType>true</enableDecimalLogicalType>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
        <finalName>${project.artifactId}</finalName>
    </build>

I've set the Specific Avro Reader to true, but it does not work.

What is B class? Why it regards the data as B before converting? How it turned to be B class? I don't have such class in my code.

I tried producing & consuming with this Value class. The producing was fine, but the consuming was not fine.

What went wrong? How to make the listener parse the data to that pojo? WHat should I do?


Update:

I have tried adding ConsumerConfig like this

@EnableKafka
@Configuration
public class KafkaConsumerConfiguration {
    @Bean
    public ConsumerFactory<String, Object> consumerFactory() {
        KafkaProperties properties = new KafkaProperties();
        return new DefaultKafkaConsumerFactory<>(properties.buildConsumerProperties(), new StringDeserializer(), new KafkaAvroDeserializer());
    }

    /**
     * This is to deserialize kafka data that has the type:<br>
     * <ul>
     *     <li>Key = String</li>
     *     <li>Value = Avro</li>
     * </ul>
     * @return
     */
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerAvroFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

But the log became clear as that it doesn't receive anything.

When I changed the KafkaProperties initialization to be @AutoWired, this is the error I received:

2024-05-07T09:42:53.844+07:00 [APP/PROC/WEB/0] [OUT] Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition TOPIC-0 at offset 98. If needed, please seek past the record to continue consumption.
2024-05-07T09:42:53.844+07:00 [APP/PROC/WEB/0] [OUT] Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
2024-05-07T09:42:53.844+07:00 [APP/PROC/WEB/0] [OUT] [07-05-2024 09:42:53.844] [DEBUG] [] [o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer] - Commit list: {}
2024-05-07T09:42:53.844+07:00 [APP/PROC/WEB/0] [OUT] [07-05-2024 09:42:53.844] [ERROR] [] [o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer] - Consumer exception
2024-05-07T09:42:53.844+07:00 [APP/PROC/WEB/0] [OUT] java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer
2024-05-07T09:42:53.844+07:00 [APP/PROC/WEB/0] [OUT] at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:145)
2024-05-07T09:42:53.844+07:00 [APP/PROC/WEB/0] [OUT] at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:113)
2024-05-07T09:42:53.844+07:00 [APP/PROC/WEB/0] [OUT] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1401)
2024-05-07T09:42:53.844+07:00 [APP/PROC/WEB/0] [OUT] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1108)
2024-05-07T09:42:53.844+07:00 [APP/PROC/WEB/0] [OUT] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
2024-05-07T09:42:53.844+07:00 [APP/PROC/WEB/0] [OUT] at java.util.concurrent.FutureTask.run(FutureTask.java:266)

Solution

  • This may not really be solving the parsing issue towards the POJO, but I decided to take another route: Let the listener parse it as GenericRecord

    @KafkaListener(topics = "TOPIC", groupId = "GROUP_ID"
                , properties = {
                    ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG + ":org.apache.kafka.common.serialization.StringDeserializer",
                    ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG + ":io.confluent.kafka.serializers.KafkaAvroDeserializer"
                })
    public void listen(GenericRecord message) throws Exception {
        logger.debug("Consumed from kafka {}", message.toString());
    }
    

    Now, it listens to the topic normally, but just need additional workaround to parse from the GenericRecord. Any other solution is warmly welcomed!