I am building a Kafka Listener / Consumer with Spring Boot that consumes Avro data from a topic.
Here's some of the properties
spring.kafka.properties.specific.avro.reader=true
spring.kafka.consumer.properties.acks=all
spring.kafka.consumer.properties.auto.offset.reset=latest
spring.kafka.properties.schema.registry.url=http://10.0.99.111:8081
spring.kafka.properties.schema.registry.ssl.truststore.location=/sr.truststore.jks
spring.kafka.properties.schema.registry.ssl.truststore.password=password
spring.kafka.properties.auto.register.schemas=false
This is the listener function:
@KafkaListener(topics = "TOPIC", groupId = "GROUP_ID"
, properties = {
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG + ":org.apache.kafka.common.serialization.StringDeserializer",
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG + ":io.confluent.kafka.serializers.KafkaAvroDeserializer"
})
public void listen(Value message) throws Exception {
logger.debug("Consumed from kafka {}", message.toString());
}
The error:
2024-05-06T17:15:23.232+07:00 [APP/PROC/WEB/0] [OUT] Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [[B] to [com.mykafkalistener.Value] for GenericMessage [payload=byte[124], headers={kafka_offset=95, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@2b056e41, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedMessageKey=18, kafka_receivedTopic=TOPIC, kafka_receivedTimestamp=1714990522973, kafka_groupId=GROUP_ID}], failedMessage=GenericMessage [payload=byte[124], headers={kafka_offset=95, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@2b056e41, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedMessageKey=18, kafka_receivedTopic=TOPIC, kafka_receivedTimestamp=1714990522973, kafka_groupId=GROUP_ID}]
2024-05-06T17:15:23.232+07:00 [APP/PROC/WEB/0] [OUT] at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:340)
2024-05-06T17:15:23.232+07:00 [APP/PROC/WEB/0] [OUT] at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:87)
2024-05-06T17:15:23.232+07:00 [APP/PROC/WEB/0] [OUT] at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:52)
2024-05-06T17:15:23.232+07:00 [APP/PROC/WEB/0] [OUT] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2044)
2024-05-06T17:15:23.232+07:00 [APP/PROC/WEB/0] [OUT] ... 10 common frames omitted
2024-05-06T17:15:23.232+07:00 [APP/PROC/WEB/0] [OUT] Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [[B] to [com.mykafkalistener.Value] for GenericMessage [payload=byte[124], headers={kafka_offset=95, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@2b056e41, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedMessageKey=18, kafka_receivedTopic=TOPIC, kafka_receivedTimestamp=1714990522973, kafka_groupId=GROUP_ID}]
2024-05-06T17:15:23.232+07:00 [APP/PROC/WEB/0] [OUT] at org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver.resolveArgument(PayloadMethodArgumentResolver.java:145)
2024-05-06T17:15:23.232+07:00 [APP/PROC/WEB/0] [OUT] at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor$KafkaNullAwarePayloadArgumentResolver.resolveArgument(KafkaListenerAnnotationBeanPostProcessor.java:910)
2024-05-06T17:15:23.232+07:00 [APP/PROC/WEB/0] [OUT] at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:117)
2024-05-06T17:15:23.232+07:00 [APP/PROC/WEB/0] [OUT] at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:148)
2024-05-06T17:15:23.232+07:00 [APP/PROC/WEB/0] [OUT] at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:116)
2024-05-06T17:15:23.232+07:00 [APP/PROC/WEB/0] [OUT] at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48)
2024-05-06T17:15:23.232+07:00 [APP/PROC/WEB/0] [OUT] at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:329)
2024-05-06T17:15:23.232+07:00 [APP/PROC/WEB/0] [OUT] ... 13 common frames omitted
When I changed the function to listen(ConsumerRecord<String, Value> message)
, The error changed to this and skipping this offset afterwards :
2024-05-06T17:15:23.232+07:00 [APP/PROC/WEB/0] [OUT] Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [[B] to [org.apache.kafka.clients.consumer.ConsumerRecords] for GenericMessage [payload=byte[124], headers={kafka_offset=95, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@2b056e41, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedMessageKey=18, kafka_receivedTopic=TOPIC, kafka_receivedTimestamp=1714990522973, kafka_groupId=GROUP_ID}], failedMessage=GenericMessage [payload=byte[124], headers={kafka_offset=95, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@2b056e41, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedMessageKey=18, kafka_receivedTopic=TOPIC, kafka_receivedTimestamp=1714990522973, kafka_groupId=GROUP_ID}]
2024-05-06T17:15:23.232+07:00 [APP/PROC/WEB/0] [OUT] at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:340)
2024-05-06T17:15:23.232+07:00 [APP/PROC/WEB/0] [OUT] at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:87)
2024-05-06T17:15:23.232+07:00 [APP/PROC/WEB/0] [OUT] at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:52)
2024-05-06T17:15:23.232+07:00 [APP/PROC/WEB/0] [OUT] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2044)
2024-05-06T17:15:23.232+07:00 [APP/PROC/WEB/0] [OUT] ... 10 common frames omitted
2024-05-06T17:15:23.232+07:00 [APP/PROC/WEB/0] [OUT] Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [[B] to [org.apache.kafka.clients.consumer.ConsumerRecords] for GenericMessage [payload=byte[124], headers={kafka_offset=95, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@2b056e41, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedMessageKey=18, kafka_receivedTopic=TOPIC, kafka_receivedTimestamp=1714990522973, kafka_groupId=GROUP_ID}]
2024-05-06T17:15:23.232+07:00 [APP/PROC/WEB/0] [OUT] at org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver.resolveArgument(PayloadMethodArgumentResolver.java:145)
2024-05-06T17:15:23.232+07:00 [APP/PROC/WEB/0] [OUT] at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor$KafkaNullAwarePayloadArgumentResolver.resolveArgument(KafkaListenerAnnotationBeanPostProcessor.java:910)
2024-05-06T17:15:23.232+07:00 [APP/PROC/WEB/0] [OUT] at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:117)
2024-05-06T17:15:23.232+07:00 [APP/PROC/WEB/0] [OUT] at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:148)
2024-05-06T17:15:23.232+07:00 [APP/PROC/WEB/0] [OUT] at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:116)
2024-05-06T17:15:23.232+07:00 [APP/PROC/WEB/0] [OUT] at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48)
2024-05-06T17:15:23.232+07:00 [APP/PROC/WEB/0] [OUT] at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:329)
2024-05-06T17:15:23.232+07:00 [APP/PROC/WEB/0] [OUT] ... 13 common frames omitted
The POJO class is auto-generated:
<build>
<plugins>
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>${avro.version}</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/resources/avro</sourceDirectory>
<outputDirectory>${project.basedir}/src/main/java/entity/kafka</outputDirectory>
<stringType>String</stringType>
<enableDecimalLogicalType>true</enableDecimalLogicalType>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
<finalName>${project.artifactId}</finalName>
</build>
I've set the Specific Avro Reader to true, but it does not work.
What is B
class? Why it regards the data as B
before converting? How it turned to be B
class? I don't have such class in my code.
I tried producing & consuming with this Value
class. The producing was fine, but the consuming was not fine.
What went wrong? How to make the listener parse the data to that pojo? WHat should I do?
Update:
I have tried adding ConsumerConfig like this
@EnableKafka
@Configuration
public class KafkaConsumerConfiguration {
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
KafkaProperties properties = new KafkaProperties();
return new DefaultKafkaConsumerFactory<>(properties.buildConsumerProperties(), new StringDeserializer(), new KafkaAvroDeserializer());
}
/**
* This is to deserialize kafka data that has the type:<br>
* <ul>
* <li>Key = String</li>
* <li>Value = Avro</li>
* </ul>
* @return
*/
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerAvroFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
But the log became clear as that it doesn't receive anything.
When I changed the KafkaProperties
initialization to be @AutoWired
, this is the error I received:
2024-05-07T09:42:53.844+07:00 [APP/PROC/WEB/0] [OUT] Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition TOPIC-0 at offset 98. If needed, please seek past the record to continue consumption.
2024-05-07T09:42:53.844+07:00 [APP/PROC/WEB/0] [OUT] Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
2024-05-07T09:42:53.844+07:00 [APP/PROC/WEB/0] [OUT] [07-05-2024 09:42:53.844] [DEBUG] [] [o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer] - Commit list: {}
2024-05-07T09:42:53.844+07:00 [APP/PROC/WEB/0] [OUT] [07-05-2024 09:42:53.844] [ERROR] [] [o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer] - Consumer exception
2024-05-07T09:42:53.844+07:00 [APP/PROC/WEB/0] [OUT] java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer
2024-05-07T09:42:53.844+07:00 [APP/PROC/WEB/0] [OUT] at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:145)
2024-05-07T09:42:53.844+07:00 [APP/PROC/WEB/0] [OUT] at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:113)
2024-05-07T09:42:53.844+07:00 [APP/PROC/WEB/0] [OUT] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1401)
2024-05-07T09:42:53.844+07:00 [APP/PROC/WEB/0] [OUT] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1108)
2024-05-07T09:42:53.844+07:00 [APP/PROC/WEB/0] [OUT] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
2024-05-07T09:42:53.844+07:00 [APP/PROC/WEB/0] [OUT] at java.util.concurrent.FutureTask.run(FutureTask.java:266)
This may not really be solving the parsing issue towards the POJO, but I decided to take another route: Let the listener parse it as GenericRecord
@KafkaListener(topics = "TOPIC", groupId = "GROUP_ID"
, properties = {
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG + ":org.apache.kafka.common.serialization.StringDeserializer",
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG + ":io.confluent.kafka.serializers.KafkaAvroDeserializer"
})
public void listen(GenericRecord message) throws Exception {
logger.debug("Consumed from kafka {}", message.toString());
}
Now, it listens to the topic normally, but just need additional workaround to parse from the GenericRecord. Any other solution is warmly welcomed!