javaapache-kafkaspring-kafkaspring-messaging

How do I deserialize a kafka message to a POJO?


I am having issues trying to deserialize a Kafka message to a POJO using Spring Kafka. I want to use the key and value parts of the message to construct the POJO.

The Kafka message key is a string and the message value is JSON.

I've tried doing just the value portion of the message by following the tutorials at codenotfound.com and baeldung.com. Except that I also want to have the key-value in the POJO and the java application isn't generating the message.

How do I get the java application to appropriately deserialize a kafka message into a POJO?

For example:

key = "test"

{
  "value1": "1st value"
  "value2": "2nd value"
}

A reproducible example of what I am trying can be found at: https://github.com/gl3h/Simple-Consumer

To reproduce the issue the following things have to be done:

  1. Run the command docker-compose up -d to bring up 3 instances of Zookeeper and Kafka. It also brings up Kafdrop that connects to the Kafka cluster.

  2. Run the java application. (gradle bootrun)

  3. Send a message to the data topic

    kafka-console-producer --broker-list kafka1:29092 --topic data --property "parse.key=true" --property "key.separator=&" 
    test&{"value1":"1st value","value2":"2nd value"}```
    
    
    

Whenever a message is sent to the Kafka cluster, the Java application fails to convert the message to the Data POJO, with the following error:

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void com.example.consumer.example.ExampleConsumer.processData(com.example.consumer.example.Data)]
Bean [com.example.consumer.example.ExampleConsumer@7a5a16cf]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [com.example.consumer.example.Data] for GenericMessage [payload={"value1":"value","value2":"value"}, headers={kafka_offset=1, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@2ff54c21, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=test, kafka_receivedPartitionId=0, kafka_receivedTopic=data, kafka_receivedTimestamp=1583036480453, kafka_groupId=data_consumer}], failedMessage=GenericMessage [payload={"value1":"value","value2":"value"}, headers={kafka_offset=1, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@2ff54c21, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=test, kafka_receivedPartitionId=0, kafka_receivedTopic=data, kafka_receivedTimestamp=1583036480453, kafka_groupId=data_consumer}]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [com.example.consumer.example.Data] for GenericMessage [payload={"value1":"value","value2":"value"}, headers={kafka_offset=1, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@2ff54c21, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=test, kafka_receivedPartitionId=0, kafka_receivedTopic=data, kafka_receivedTimestamp=1583036480453, kafka_groupId=data_consumer}], failedMessage=GenericMessage [payload={"value1":"value","value2":"value"}, headers={kafka_offset=1, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@2ff54c21, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=test, kafka_receivedPartitionId=0, kafka_receivedTopic=data, kafka_receivedTimestamp=1583036480453, kafka_groupId=data_consumer}]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:1641) [spring-kafka-2.3.0.RELEASE.jar:2.3.0.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:1630) [spring-kafka-2.3.0.RELEASE.jar:2.3.0.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1546) [spring-kafka-2.3.0.RELEASE.jar:2.3.0.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1487) [spring-kafka-2.3.0.RELEASE.jar:2.3.0.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1401) [spring-kafka-2.3.0.RELEASE.jar:2.3.0.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1165) [spring-kafka-2.3.0.RELEASE.jar:2.3.0.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:949) [spring-kafka-2.3.0.RELEASE.jar:2.3.0.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:884) [spring-kafka-2.3.0.RELEASE.jar:2.3.0.RELEASE]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_221]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_221]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_221]
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [com.example.consumer.example.Data] for GenericMessage [payload={"value1":"value","value2":"value"}, headers={kafka_offset=1, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@2ff54c21, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=test, kafka_receivedPartitionId=0, kafka_receivedTopic=data, kafka_receivedTimestamp=1583036480453, kafka_groupId=data_consumer}], failedMessage=GenericMessage [payload={"value1":"value","value2":"value"}, headers={kafka_offset=1, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@2ff54c21, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=test, kafka_receivedPartitionId=0, kafka_receivedTopic=data, kafka_receivedTimestamp=1583036480453, kafka_groupId=data_consumer}]
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:314) ~[spring-kafka-2.3.0.RELEASE.jar:2.3.0.RELEASE]
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:86) ~[spring-kafka-2.3.0.RELEASE.jar:2.3.0.RELEASE]
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:51) ~[spring-kafka-2.3.0.RELEASE.jar:2.3.0.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1592) [spring-kafka-2.3.0.RELEASE.jar:2.3.0.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1575) [spring-kafka-2.3.0.RELEASE.jar:2.3.0.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1534) [spring-kafka-2.3.0.RELEASE.jar:2.3.0.RELEASE]
    ... 8 common frames omitted
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [com.example.consumer.example.Data] for GenericMessage [payload={"value1":"value","value2":"value"}, headers={kafka_offset=1, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@2ff54c21, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=test, kafka_receivedPartitionId=0, kafka_receivedTopic=data, kafka_receivedTimestamp=1583036480453, kafka_groupId=data_consumer}]
    at org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver.resolveArgument(PayloadMethodArgumentResolver.java:145) ~[spring-messaging-5.2.4.RELEASE.jar:5.2.4.RELEASE]
    at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor$KafkaNullAwarePayloadArgumentResolver.resolveArgument(KafkaListenerAnnotationBeanPostProcessor.java:905) ~[spring-kafka-2.3.0.RELEASE.jar:2.3.0.RELEASE]
    at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:117) ~[spring-messaging-5.2.4.RELEASE.jar:5.2.4.RELEASE]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:148) ~[spring-messaging-5.2.4.RELEASE.jar:5.2.4.RELEASE]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:116) ~[spring-messaging-5.2.4.RELEASE.jar:5.2.4.RELEASE]
    at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48) ~[spring-kafka-2.3.0.RELEASE.jar:2.3.0.RELEASE]
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:304) ~[spring-kafka-2.3.0.RELEASE.jar:2.3.0.RELEASE]
    ... 13 common frames omitted

Do I have to write a custom deserializer?
And what would the custom deserializer look like?


Solution

  • welcome to StackOverflow!

    By default Spring Kafka uses a String Deserializer when consuming the message, so in your case it looks like you want to deserialize a Json message, for this the first step would be to register as a value deserializer to be JsonDeserializer.class. This should work for the values of the message but still doesn't solve the key which you also want.

    In Kafka the Key and Value serializers are not combined so I don't think there's an easy way to get also the key while deseriliazing, the easiest options you would have are probably:

    1. Make the key part of your Json object so it will be automatically deserialized with the JsonDeserliazer.

    2. Process on the consumer side receiving instead of the Object itself but instead use ConsumerRecord which will return the key and value deserialized so you can simply add the key to the deserialized object using a setter.

    I hope it helps to clarify. I will take a quick look in your example on Github and do a PR, done. So to fix it using the approach to have the key as part of the message payload(check the PR in your Repo):

    Add the key to the Data Object as a property and for your consumer:

     @Component
    public class ExampleConsumer {
    
        @KafkaListener(topics = "data")
        public void processData(Data data) {
            System.out.println("Data:" + data);
        }
    }
    

    And add the proper configuration, application.yml:

    spring:
      application:
        name: kafka-consumer-example
      kafka:
        bootstrap-servers: localhost:9092
        consumer:
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
          group-id: data_consumer
          client-id: ${spring.application.name}
          properties:
            spring.json.value.default.type: com.example.consumer.example.Data
            spring.json.type.mapping: "data:com.example.consumer.example.Data"
            spring.json.trusted.packages: "*"
    
        listener:
          missing-topics-fatal: false
    

    P.S - Also to run kafka locally I would recommend that you use a docker-compose file with a single zookeeper and kafka, check this example -> https://dev.to/thegroo/spring-kafka-producer-and-consumer-41oc or this other one -> https://dev.to/thegroo/one-to-run-them-all-1mg6