@Configuration
public class Config {
@Bean
public RecordMessageConverter converter() {
return new JsonMessageConverter();
}
@Bean
public BatchMessagingMessageConverter batchConverter() {
return new BatchMessagingMessageConverter(converter());
}
}
@Component
@RequiredArgsConstructor
public class CancelAuthorizationLinkageListener {
private final KafkaTemplate<String, CancelAuthorizationLinkage> kafkaTemplate;
@KafkaListener(
id = "${spring.kafka.listener.cancel-auth-linkage.id}",
topics = "${spring.kafka.listener.cancel-auth-linkage.topic.linkage}",
autoStartup = "false",
batch = "true",
groupId = "cushion")
public void listen(List<Message<CancelAuthorizationLinkage>> messages) {
// other operations...
messages.forEach(message -> kafkaTemplate.send(build));
kafkaTemplate.send(build);
}
}
spring:
...
kafka:
bootstrap-servers: localhost:9092
producer:
acks: -1
transaction-id-prefix: cushion-kafka-tx
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
consumer:
enable-auto-commit: false
auto-offset-reset: earliest
max-poll-records: 100
value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
properties:
isolation.level: read_committed
spring.json.trusted.packages: "*"
...
When trying to send messages, I'm getting the following exception:
``` java
Caused by: java.lang.UnsupportedOperationException: Select a subclass that creates a ProducerRecord value corresponding to the configured Kafka Serializer
at org.springframework.kafka.support.converter.JsonMessageConverter.convertPayload(JsonMessageConverter.java:96)
at org.springframework.kafka.support.converter.MessagingMessageConverter.fromMessage(MessagingMessageConverter.java:251)
at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:583)
It seems that the JsonMessageConverter is unable to convert the message payload. I've configured the JsonMessageConverter and BatchMessagingMessageConverter beans, but the error persists. What could be causing this issue, and how can I resolve it? Are there any additional configurations or changes I need to make to ensure proper message conversion when sending?
See JsonMessageConverter
source code:
protected Object convertPayload(Message<?> message) {
throw new UnsupportedOperationException("Select a subclass that creates a ProducerRecord value "
+ "corresponding to the configured Kafka Serializer");
}
and that is exactly what you see in your logs.
Since you use org.apache.kafka.common.serialization.ByteArrayDeserializer
, your choice of extension should be org.springframework.kafka.support.converter.ByteArrayJsonMessageConverter
. I mean that converter()
bean must use this class instead of that super.