spring-bootspring-kafka

Spring Kafka JsonMessageConverter throws UnsupportedOperationException when sending messages


@Configuration
public class Config {
    @Bean
    public RecordMessageConverter converter() {
        return new JsonMessageConverter();
    }
    
    @Bean
    public BatchMessagingMessageConverter batchConverter() {
        return new BatchMessagingMessageConverter(converter());
    }
}

@Component
@RequiredArgsConstructor
public class CancelAuthorizationLinkageListener {
    private final KafkaTemplate<String, CancelAuthorizationLinkage> kafkaTemplate;
    
    @KafkaListener(
            id = "${spring.kafka.listener.cancel-auth-linkage.id}",
            topics = "${spring.kafka.listener.cancel-auth-linkage.topic.linkage}",
            autoStartup = "false",
            batch = "true",
            groupId = "cushion")
    public void listen(List<Message<CancelAuthorizationLinkage>> messages) {
        // other operations...
        messages.forEach(message -> kafkaTemplate.send(build));
        kafkaTemplate.send(build);
    }
}
spring:
  ...
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      acks: -1
      transaction-id-prefix: cushion-kafka-tx
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
    consumer:
      enable-auto-commit: false
      auto-offset-reset: earliest
      max-poll-records: 100
      value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
      properties:
        isolation.level: read_committed
        spring.json.trusted.packages: "*"
  ...
When trying to send messages, I'm getting the following exception:
``` java
Caused by: java.lang.UnsupportedOperationException: Select a subclass that creates a ProducerRecord value corresponding to the configured Kafka Serializer
    at org.springframework.kafka.support.converter.JsonMessageConverter.convertPayload(JsonMessageConverter.java:96)
    at org.springframework.kafka.support.converter.MessagingMessageConverter.fromMessage(MessagingMessageConverter.java:251)
    at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:583)

It seems that the JsonMessageConverter is unable to convert the message payload. I've configured the JsonMessageConverter and BatchMessagingMessageConverter beans, but the error persists. What could be causing this issue, and how can I resolve it? Are there any additional configurations or changes I need to make to ensure proper message conversion when sending?


Solution

  • See JsonMessageConverter source code:

    protected Object convertPayload(Message<?> message) {
        throw new UnsupportedOperationException("Select a subclass that creates a ProducerRecord value "
                + "corresponding to the configured Kafka Serializer");
    }
    

    and that is exactly what you see in your logs. Since you use org.apache.kafka.common.serialization.ByteArrayDeserializer, your choice of extension should be org.springframework.kafka.support.converter.ByteArrayJsonMessageConverter. I mean that converter() bean must use this class instead of that super.