I’m trying to use the Spring Kafka JsonSerializer
to send JSON object from my producer, but I'm getting an exception when sending a message related to ProducerRecord
serialization. I expected JsonSerializer
to handle the serialization of my message, but it’s failing when trying to serialize the ProducerRecord
. Here’s my setup:
import com.app.integration.impl.dto.TransitECS;
import com.app.integration.impl.util.KafkaConstants;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import java.util.Map;
@Configuration
public class KafkaProducerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> producerConfigs() {
return Map.of(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers,
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class
);
}
@Bean
public ProducerFactory<String, TransitECS> transitECSProducerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, TransitECS> transitECSKafkaTemplate() {
KafkaTemplate<String, TransitECS> kafkaTemplate = new KafkaTemplate<>(transitECSProducerFactory());
kafkaTemplate.setDefaultTopic(KafkaConstants.TRANSIT_ECS_TOPIC);
return kafkaTemplate;
}
}
@RestController
@RequestMapping("/api/v1/topic")
@RequiredArgsConstructor
public class TopicController {
private final KafkaTemplate<String, TransitECS> transitECSKafkaTemplate;
/**
* This method is used to send the transit ECS to the topic.
*
* @param transitECS The transit ECS to be sent.
*/
@PostMapping("/send/transitECS")
public CompletableFuture<SendResult<String, TransitECS>> sendTransit(@Valid @RequestBody TransitECS transitECS) {
return this.transitECSKafkaTemplate.sendDefault(transitECS);
}
}
com.fasterxml.jackson.databind.exc.InvalidDefinitionException: No serializer found for class org.apache.kafka.clients.producer.ProducerRecord and no properties discovered to create BeanSerializer (to avoid exception, disable SerializationFeature.FAIL_ON_EMPTY_BEANS) (through reference chain: org.springframework.kafka.support.SendResult["producerRecord"])
package com.fasterxml.jackson.databind.ser.impl;
public class UnknownSerializer
extends ToEmptyObjectSerializer // since 2.13
{
...
@Override
public void serialize(Object value, JsonGenerator gen, SerializerProvider ctxt) throws IOException
{
// 27-Nov-2009, tatu: As per [JACKSON-201] may or may not fail...
if (ctxt.isEnabled(SerializationFeature.FAIL_ON_EMPTY_BEANS)) {
failForEmpty(ctxt, value); // <------ IS ENTERING HERE
}
super.serialize(value, gen, ctxt);
}
protected void failForEmpty(SerializerProvider prov, Object value)
throws JsonMappingException {
Class<?> cl = value.getClass();
if (NativeImageUtil.needsReflectionConfiguration(cl)) {
prov.reportBadDefinition(handledType(), String.format(
"No serializer found for class %s and no properties discovered to create BeanSerializer (to avoid exception, disable SerializationFeature.FAIL_ON_EMPTY_BEANS). This appears to be a native image, in which case you may need to configure reflection for the class that is to be serialized",
cl.getName()));
} else {
prov.reportBadDefinition(handledType(), String.format(
"No serializer found for class %s and no properties discovered to create BeanSerializer (to avoid exception, disable SerializationFeature.FAIL_ON_EMPTY_BEANS)",
cl.getName()));
}
}
}
The exception indicates that Jackson is attempting to serialize ProducerRecord
that contains my payload object and other kafka stuff (headers, key, ...).
The point is, how is it possible that org.springframework.kafka.support.serializer.JsonSerializer
cannot serialize org.apache.kafka.clients.producer.ProducerRecord
class ?
3.0.1
Any suggestions or insights would be appreciated!
SerializationFeature.FAIL_ON_EMPTY_BEANS
but without success. @Bean
public ProducerFactory<String, TransitECS> transitECSProducerFactory() {
ObjectMapper mapper = new ObjectMapper();
mapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
mapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
return new DefaultKafkaProducerFactory<>(
producerConfigs(),
new StringSerializer(),
new JsonSerializer<>(mapper)
);
}
[SOLVED]
I've resolved defining a Bean of type ObjectMapper instead of passing it to the JsonSerializer constructor during the DefaultKafkaProducerFactory Bean definition.
@Configuration
public class ObjectMapperConfig {
@Bean
@Primary
public ObjectMapper objectMapper() {
ObjectMapper mapper = new ObjectMapper();
mapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
return mapper;
}
}