
setting the value serializer only works via the constructor with 3 params, the map configuration is not considered

I am learning how to use Spring for Apache Kafka 3.3.1. I am following the official documentation here, here and here to set JSON value serializer for my KafkaTemplate.

Unfortunately in my case initializing the ProducerFactory<> using the Map<String,Object> configs constructor does not set the serializer and my template uses the default serializers. Only the constructor that sets the config-map + key and value serializers works properly.

It seems that I really missed something, but I can not see what. I would like to use the simple constructor with one Map parameter as it is written in the doc:

return new DefaultKafkaProducerFactory<>(producerConfigs())

My Kafka configuration:

public class KafkaConfiguration {

    private String kafkaBootstrapServers;

    private String kafkaProducerEnableIdempotence;

    private String kafkaProducerAcks;

    private String kafkaProducerRetries;

    private String kafkaProducerLingerMs;

    private String kafkaProducerDeliveryTimeoutMs;

    private String kafkaProducerRequestTimeoutMs;

    private String kafkaProducerRetryBackoffMs;

    private String kafkaProducerRetryBackoffMaxMs;

    private String kafkaTopicName;

    private int kafkaTopicPartitions;

    private int kafkaTopicReplicas;

    public ProducerFactory<String, Event> producerFactory() {
        // set the key and value serializer this way does not work
        // KafkaTemplate uses the default serializers
        //DefaultKafkaProducerFactory<String, Event> factory = new DefaultKafkaProducerFactory<>(producerConfiguration());

        // this sets properly the serializers
        DefaultKafkaProducerFactory<String, Event> factory = new DefaultKafkaProducerFactory<>(
                new StringSerializer(),
                new JsonDeserializer<>(Event.class));
        return factory;

    public KafkaTemplate<String, Event> kafkaTemplate() {
        var factory = producerFactory();
        log.debug("initializing a KafkaTemplate using the following setting: {{}}", factoryConfigurationToString(factory));
        return new KafkaTemplate<>(factory);

    public KafkaAdmin admin() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
        return new KafkaAdmin(configs);

    public NewTopic topic() {
                "creating a new kafka topic: \"{name: \"{}\", partitions: {}, replicas: {}}\"",


    private Map<String, Object> producerConfiguration() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, kafkaProducerEnableIdempotence);
        configs.put(ProducerConfig.ACKS_CONFIG, kafkaProducerAcks);
        configs.put(ProducerConfig.RETRIES_CONFIG, kafkaProducerRetries);
        configs.put(ProducerConfig.LINGER_MS_CONFIG, kafkaProducerLingerMs);
        configs.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, kafkaProducerDeliveryTimeoutMs);
        configs.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, kafkaProducerRequestTimeoutMs);
        configs.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, kafkaProducerRetryBackoffMs);
        configs.put(ProducerConfig.RETRY_BACKOFF_MAX_MS_CONFIG, kafkaProducerRetryBackoffMaxMs);
        return configs;

    private String factoryConfigurationToString(ProducerFactory<String, Event> producerFactory) {
        var keySerializer = producerFactory.getKeySerializer();
        var keySerializerAsString = Objects.isNull(keySerializer) ? "null" : keySerializer.getClass().getName();

        var valueSerializer = producerFactory.getValueSerializer();
        var valueSerializerAsString = Objects.isNull(valueSerializer) ? "null" : valueSerializer.getClass().getName();

        var sb = new StringBuilder().append("configuration: ").append("{");
                forEach((key, value) -> sb.append(String.format("\"%s\": \"%s\", ", key, value)));

        sb.setLength(sb.length() - 2);
        sb.append("}, ");
        sb.append("key-serializer: ").append(keySerializerAsString).append(", ");
        sb.append("value-serializer: ").append(valueSerializerAsString);
        return sb.toString();

I always check the result using a Topic browser as well. In the 1st case I can see binary content on the topic, not JSON.

Log using the constructor with only Map:

DEBUG 200 --- [kafka-producer] [           main] c.r.g.s.m.p.c.KafkaConfiguration         : initializing a KafkaTemplate using the following setting:
      "key.serializer":"class org.apache.kafka.common.serialization.StringSerializer",
   "key-serializer":null,   <-- WRONG
   "value-serializer":null  <-- WRONG
DEBUG 200 --- [kafka-producer] [           main] c.r.g.s.m.p.c.KafkaConfiguration         : creating a new kafka topic:

Log using the constructor with three parameters:

DEBUG 201 --- [kafka-producer] [           main] c.r.g.s.m.p.c.KafkaConfiguration     
    : initializing a KafkaTemplate using the following setting:
      "key.serializer":"class org.apache.kafka.common.serialization.StringSerializer",
   "key-serializer":"org.apache.kafka.common.serialization.StringSerializer",         <-- OK
   "value-serializer":""   <-- OK
DEBUG 201 --- [kafka-producer] [           main] c.r.g.s.m.p.c.KafkaConfiguration         : creating a new kafka topic:


  • The problem was related to my kafka-consumer configuration. My kafka-producer configuration was correct.

    If you want to provide type info for the JsonDeserializer then you need to use the constructor of the ConsumerFactory:

    public ConsumerFactory<String, Event> consumerConfigs() {
        return new DefaultKafkaConsumerFactory<>(
                new StringDeserializer(),
                new JsonDeserializer<>(Event.class)); // use typed serializer

    If you do not need to provide type info, then you can use the default constructor of the the JsonDeserializer with the Map config.

    public ConsumerFactory<String, Event> consumerConfigs() {
        return new DefaultKafkaConsumerFactory<>(consumerConfiguration());
    private Map<String, Object> consumerConfiguration() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        return configs;