spring-bootjacksonspring-kafka

Why is Kafka JsonSerializer unable to serialize kafka ProducerRecord?


I’m trying to use the Spring Kafka JsonSerializer to send JSON object from my producer, but I'm getting an exception when sending a message related to ProducerRecord serialization. I expected JsonSerializer to handle the serialization of my message, but it’s failing when trying to serialize the ProducerRecord. Here’s my setup:

Producer configuration:

import com.app.integration.impl.dto.TransitECS;
import com.app.integration.impl.util.KafkaConstants;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;

import java.util.Map;

@Configuration
public class KafkaProducerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> producerConfigs() {
        return Map.of(
                ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers,
                ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class,
                ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class
        );
    }

    @Bean
    public ProducerFactory<String, TransitECS> transitECSProducerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, TransitECS> transitECSKafkaTemplate() {
        KafkaTemplate<String, TransitECS> kafkaTemplate = new KafkaTemplate<>(transitECSProducerFactory());
        kafkaTemplate.setDefaultTopic(KafkaConstants.TRANSIT_ECS_TOPIC);
        return kafkaTemplate;
    }
}

Sender method:

@RestController
@RequestMapping("/api/v1/topic")
@RequiredArgsConstructor
public class TopicController {
    private final KafkaTemplate<String, TransitECS> transitECSKafkaTemplate;

    /**
     * This method is used to send the transit ECS to the topic.
     *
     * @param transitECS The transit ECS to be sent.
     */
    @PostMapping("/send/transitECS")
    public CompletableFuture<SendResult<String, TransitECS>> sendTransit(@Valid @RequestBody TransitECS transitECS) {
        return this.transitECSKafkaTemplate.sendDefault(transitECS);
    }
}

Exception Message:

com.fasterxml.jackson.databind.exc.InvalidDefinitionException: No serializer found for class org.apache.kafka.clients.producer.ProducerRecord and no properties discovered to create BeanSerializer (to avoid exception, disable SerializationFeature.FAIL_ON_EMPTY_BEANS) (through reference chain: org.springframework.kafka.support.SendResult["producerRecord"])

Code that is causing the exception:

package com.fasterxml.jackson.databind.ser.impl;

public class UnknownSerializer
    extends ToEmptyObjectSerializer // since 2.13
{
   ...

    @Override
    public void serialize(Object value, JsonGenerator gen, SerializerProvider ctxt) throws IOException
    {
        // 27-Nov-2009, tatu: As per [JACKSON-201] may or may not fail...
        if (ctxt.isEnabled(SerializationFeature.FAIL_ON_EMPTY_BEANS)) {
            failForEmpty(ctxt, value); // <------ IS ENTERING HERE
        }
        super.serialize(value, gen, ctxt);
    }

    protected void failForEmpty(SerializerProvider prov, Object value)
            throws JsonMappingException {
        Class<?> cl = value.getClass();
        if (NativeImageUtil.needsReflectionConfiguration(cl)) {
            prov.reportBadDefinition(handledType(), String.format(
                    "No serializer found for class %s and no properties discovered to create BeanSerializer (to avoid exception, disable SerializationFeature.FAIL_ON_EMPTY_BEANS). This appears to be a native image, in which case you may need to configure reflection for the class that is to be serialized",
                    cl.getName()));
        } else {
            prov.reportBadDefinition(handledType(), String.format(
                    "No serializer found for class %s and no properties discovered to create BeanSerializer (to avoid exception, disable SerializationFeature.FAIL_ON_EMPTY_BEANS)",
                    cl.getName()));
        }
    }
}

The exception indicates that Jackson is attempting to serialize ProducerRecord that contains my payload object and other kafka stuff (headers, key, ...). The point is, how is it possible that org.springframework.kafka.support.serializer.JsonSerializer cannot serialize org.apache.kafka.clients.producer.ProducerRecord class ?


Spring Boot Version:

3.0.1

Any suggestions or insights would be appreciated!


What I've tried:

    @Bean
    public ProducerFactory<String, TransitECS> transitECSProducerFactory() {
        ObjectMapper mapper = new ObjectMapper();
        mapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
        mapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
        return new DefaultKafkaProducerFactory<>(
                producerConfigs(),
                new StringSerializer(),
                new JsonSerializer<>(mapper)
        );
    }

Solution

  • [SOLVED]

    I've resolved defining a Bean of type ObjectMapper instead of passing it to the JsonSerializer constructor during the DefaultKafkaProducerFactory Bean definition.

    @Configuration
    public class ObjectMapperConfig {
        @Bean
        @Primary
        public ObjectMapper objectMapper() {
            ObjectMapper mapper = new ObjectMapper();
            mapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
            return mapper;
        }
    }