javaspringapache-kafkaspring-kafka

setting the value serializer only works via the constructor with 3 params, the map configuration is not considered


I am learning how to use Spring for Apache Kafka 3.3.1. I am following the official documentation here, here and here to set JSON value serializer for my KafkaTemplate.

Unfortunately in my case initializing the ProducerFactory<> using the Map<String,Object> configs constructor does not set the serializer and my template uses the default serializers. Only the constructor that sets the config-map + key and value serializers works properly.

It seems that I really missed something, but I can not see what. I would like to use the simple constructor with one Map parameter as it is written in the doc:

return new DefaultKafkaProducerFactory<>(producerConfigs())

My Kafka configuration:

@Slf4j
@Configuration
@RequiredArgsConstructor
@EnableKafka
public class KafkaConfiguration {

    @Value("${kafka.producer.bootstrap.servers:kafka-1.hello.com:9092, kafka-2.hello.com:9092}")
    private String kafkaBootstrapServers;

    @Value("${kafka.producer.enable.idempotence:true}")
    private String kafkaProducerEnableIdempotence;

    @Value("${kafka.producer.acks:all}")
    private String kafkaProducerAcks;

    @Value("${kafka.producer.retries:2147483647}")
    private String kafkaProducerRetries;

    @Value("${kafka.producer.linger.ms:0}")
    private String kafkaProducerLingerMs;

    @Value("${kafka.producer.delivery.timeout.ms:120000}")
    private String kafkaProducerDeliveryTimeoutMs;

    @Value("${kafka.producer.request.timeout.ms:30000}")
    private String kafkaProducerRequestTimeoutMs;

    @Value("${kafka.producer.retry.backoff.ms:100}")
    private String kafkaProducerRetryBackoffMs;

    @Value("${kafka.producer.retry.backoff.max.ms:1000}")
    private String kafkaProducerRetryBackoffMaxMs;


    @Value("${kafka.topic.name:topic1}")
    private String kafkaTopicName;

    @Value("${kafka.topic.partitions:1}")
    private int kafkaTopicPartitions;

    @Value("${kafka.topic.replicas:1}")
    private int kafkaTopicReplicas;

    @Bean
    public ProducerFactory<String, Event> producerFactory() {
        // set the key and value serializer this way does not work
        // KafkaTemplate uses the default serializers
        //DefaultKafkaProducerFactory<String, Event> factory = new DefaultKafkaProducerFactory<>(producerConfiguration());

        // this sets properly the serializers
        DefaultKafkaProducerFactory<String, Event> factory = new DefaultKafkaProducerFactory<>(
                producerConfiguration(),
                new StringSerializer(),
                new JsonDeserializer<>(Event.class));
        factory.setProducerPerThread(true);
        return factory;
    }

    @Bean
    public KafkaTemplate<String, Event> kafkaTemplate() {
        var factory = producerFactory();
        log.debug("initializing a KafkaTemplate using the following setting: {{}}", factoryConfigurationToString(factory));
        return new KafkaTemplate<>(factory);
    }

    @Bean
    public KafkaAdmin admin() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
        return new KafkaAdmin(configs);
    }

    @Bean
    public NewTopic topic() {
        log.debug(
                "creating a new kafka topic: \"{name: \"{}\", partitions: {}, replicas: {}}\"",
                kafkaTopicName,
                kafkaTopicPartitions,
                kafkaTopicReplicas);

        return TopicBuilder.name(kafkaTopicName)
                .partitions(kafkaTopicPartitions)
                .replicas(kafkaTopicReplicas)
                .build();
    }

    private Map<String, Object> producerConfiguration() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, kafkaProducerEnableIdempotence);
        configs.put(ProducerConfig.ACKS_CONFIG, kafkaProducerAcks);
        configs.put(ProducerConfig.RETRIES_CONFIG, kafkaProducerRetries);
        configs.put(ProducerConfig.LINGER_MS_CONFIG, kafkaProducerLingerMs);
        configs.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, kafkaProducerDeliveryTimeoutMs);
        configs.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, kafkaProducerRequestTimeoutMs);
        configs.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, kafkaProducerRetryBackoffMs);
        configs.put(ProducerConfig.RETRY_BACKOFF_MAX_MS_CONFIG, kafkaProducerRetryBackoffMaxMs);
        return configs;
    }

    private String factoryConfigurationToString(ProducerFactory<String, Event> producerFactory) {
        var keySerializer = producerFactory.getKeySerializer();
        var keySerializerAsString = Objects.isNull(keySerializer) ? "null" : keySerializer.getClass().getName();

        var valueSerializer = producerFactory.getValueSerializer();
        var valueSerializerAsString = Objects.isNull(valueSerializer) ? "null" : valueSerializer.getClass().getName();

        var sb = new StringBuilder().append("configuration: ").append("{");
        producerFactory.
                getConfigurationProperties().
                forEach((key, value) -> sb.append(String.format("\"%s\": \"%s\", ", key, value)));

        sb.setLength(sb.length() - 2);
        sb.append("}, ");
        sb.append("key-serializer: ").append(keySerializerAsString).append(", ");
        sb.append("value-serializer: ").append(valueSerializerAsString);
        return sb.toString();
    }
}

I always check the result using a Topic browser as well. In the 1st case I can see binary content on the topic, not JSON.

Log using the constructor with only Map:

DEBUG 200 --- [kafka-producer] [           main] c.r.g.s.m.p.c.KafkaConfiguration         : initializing a KafkaTemplate using the following setting:
{
   "configuration":{
      "retries":"5",
      "enable.idempotence":"true",
      "retry.backoff.max.ms":"1000",
      "value.serializer":"class org.springframework.kafka.support.serializer.JsonSerializer",
      "request.timeout.ms":"30000",
      "acks":"all",
      "bootstrap.servers":"kafka-1.hello.com:9092, kafka-2.hello.com:9092",
      "delivery.timeout.ms":"120000",
      "retry.backoff.ms":"100",
      "key.serializer":"class org.apache.kafka.common.serialization.StringSerializer",
      "linger.ms":"0"
   },
   "key-serializer":null,   <-- WRONG
   "value-serializer":null  <-- WRONG
}
DEBUG 200 --- [kafka-producer] [           main] c.r.g.s.m.p.c.KafkaConfiguration         : creating a new kafka topic:
{
   "name":"incoming",
   "partitions":2,
   "replicas":2
}

Log using the constructor with three parameters:

DEBUG 201 --- [kafka-producer] [           main] c.r.g.s.m.p.c.KafkaConfiguration     
    : initializing a KafkaTemplate using the following setting:
{
   "configuration":{
      "retries":"5",
      "enable.idempotence":"true",
      "retry.backoff.max.ms":"1000",
      "value.serializer":"class org.springframework.kafka.support.serializer.JsonSerializer",
      "request.timeout.ms":"30000",
      "acks":"all",
      "bootstrap.servers":"kafka-1.hello.com:9092, kafka-2.hello.com:9092",
      "delivery.timeout.ms":"120000",
      "retry.backoff.ms":"100",
      "key.serializer":"class org.apache.kafka.common.serialization.StringSerializer",
      "linger.ms":"0"
   },
   "key-serializer":"org.apache.kafka.common.serialization.StringSerializer",         <-- OK
   "value-serializer":"org.springframework.kafka.support.serializer.JsonSerializer"   <-- OK
}
DEBUG 201 --- [kafka-producer] [           main] c.r.g.s.m.p.c.KafkaConfiguration         : creating a new kafka topic:
{
   "name":"incoming",
   "partitions":2,
   "replicas":2
}

Solution

  • The problem was related to my kafka-consumer configuration. My kafka-producer configuration was correct.

    If you want to provide type info for the JsonDeserializer then you need to use the constructor of the ConsumerFactory:

    @Bean
    public ConsumerFactory<String, Event> consumerConfigs() {
        return new DefaultKafkaConsumerFactory<>(
                consumerConfiguration(),
                new StringDeserializer(),
                new JsonDeserializer<>(Event.class)); // use typed serializer
    }
    

    If you do not need to provide type info, then you can use the default constructor of the the JsonDeserializer with the Map config.

    @Bean
    public ConsumerFactory<String, Event> consumerConfigs() {
        return new DefaultKafkaConsumerFactory<>(consumerConfiguration());
    }
    
    
    private Map<String, Object> consumerConfiguration() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
    
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        ...
        return configs;
    }