javaspring-bootapache-kafkaspring-kafka

Spring Kafka The class is not in the trusted packages


In my Spring Boot/Kafka application before the library update, I used the following class org.telegram.telegrambots.api.objects.Update in order to post messages to the Kafka topic. Right now I use the following org.telegram.telegrambots.meta.api.objects.Update. As you may see - they have different packages.

After application restart I ran into the following issue:

[org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] o.s.kafka.listener.LoggingErrorHandler : Error while processing: null

org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition telegram.fenix.bot.update-0 at offset 4223. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalArgumentException: The class 'org.telegram.telegrambots.api.objects.Update' is not in the trusted packages: [java.util, java.lang, org.telegram.telegrambots.meta.api.objects]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:139) ~[spring-kafka-2.1.8.RELEASE.jar!/:2.1.8.RELEASE]
at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.toJavaType(DefaultJackson2JavaTypeMapper.java:113) ~[spring-kafka-2.1.8.RELEASE.jar!/:2.1.8.RELEASE]
at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:221) ~[spring-kafka-2.1.8.RELEASE.jar!/:2.1.8.RELEASE]
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:967) ~[kafka-clients-1.1.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.access$3300(Fetcher.java:93) ~[kafka-clients-1.1.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1144) ~[kafka-clients-1.1.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1400(Fetcher.java:993) ~[kafka-clients-1.1.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:527) ~[kafka-clients-1.1.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:488) ~[kafka-clients-1.1.0.jar!/:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1155) ~[kafka-clients-1.1.0.jar!/:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115) ~[kafka-clients-1.1.0.jar!/:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699) ~[spring-kafka-2.1.8.RELEASE.jar!/:2.1.8.RELEASE]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_171]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_171]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_171]

This is my config:

@EnableAsync
@Configuration
public class ApplicationConfig {

    @Bean
    public StringJsonMessageConverter jsonConverter() {
        return new StringJsonMessageConverter();
    }

}

@Configuration
public class KafkaProducerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> producerConfigs() {

        Map<String, Object> props = new HashMap<>();

        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 15000000);

        return props;
    }

    @Bean
    public ProducerFactory<String, Update> updateProducerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, Update> updateKafkaTemplate() {
        return new KafkaTemplate<>(updateProducerFactory());
    }

}

@Configuration
public class KafkaConsumerConfig {

    @Value("${kafka.consumer.max.poll.interval.ms}")
    private String kafkaConsumerMaxPollIntervalMs;

    @Value("${kafka.consumer.max.poll.records}")
    private String kafkaConsumerMaxPollRecords;

    @Value("${kafka.topic.telegram.fenix.bot.update.consumer.concurrency}")
    private Integer updateConsumerConcurrency;

    @Bean
    public ConsumerFactory<String, String> consumerFactory(KafkaProperties kafkaProperties) {
        return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties(), new StringDeserializer(), new JsonDeserializer<>(String.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(KafkaProperties kafkaProperties) {

        kafkaProperties.getProperties().put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, kafkaConsumerMaxPollIntervalMs);
        kafkaProperties.getProperties().put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaConsumerMaxPollRecords);

        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
        factory.setConsumerFactory(consumerFactory(kafkaProperties));

        return factory;
    }

    @Bean
    public ConsumerFactory<String, Update> updateConsumerFactory(KafkaProperties kafkaProperties) {
        return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties(), new StringDeserializer(), new JsonDeserializer<>(Update.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Update> updateKafkaListenerContainerFactory(KafkaProperties kafkaProperties) {

        kafkaProperties.getProperties().put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, kafkaConsumerMaxPollIntervalMs);
        kafkaProperties.getProperties().put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaConsumerMaxPollRecords);

        ConcurrentKafkaListenerContainerFactory<String, Update> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
        factory.setConsumerFactory(updateConsumerFactory(kafkaProperties));
        factory.setConcurrency(updateConsumerConcurrency);

        return factory;
    }

}

application.properties

spring.kafka.bootstrap-servers=${kafka.host}:${kafka.port}
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.group-id=postfenix
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

How to solve this issue and let Kafka deserialize old messages into the new ones ?

UPDATED

This is my listener

@Component
public class UpdateConsumer {

    @KafkaListener(topics = "${kafka.topic.update}", containerFactory = "updateKafkaListenerContainerFactory")
    public void onUpdateReceived(ConsumerRecord<String, Update> consumerRecord, Acknowledgment ack) {

        //do some logic here

        ack.acknowledge();
    }

}

Solution

  • See the documentation.

    Starting with version 2.1, type information can be conveyed in record Headers, allowing the handling of multiple types. In addition, the serializer/deserializer can be configured using Kafka properties.

    JsonSerializer.ADD_TYPE_INFO_HEADERS (default true); set to false to disable this feature on the JsonSerializer (sets the addTypeInfo property).

    JsonDeserializer.KEY_DEFAULT_TYPE; fallback type for deserialization of keys if no header information is present.

    JsonDeserializer.VALUE_DEFAULT_TYPE; fallback type for deserialization of values if no header information is present.

    JsonDeserializer.TRUSTED_PACKAGES (default java.util, java.lang); comma-delimited list of package patterns allowed for deserialization; * means deserialize all.

    By default, the serializer will add type information to the headers.

    see the boot documentation.

    Similarly, you can disable the JsonSerializer default behavior of sending type information in headers:

    spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
    spring.kafka.producer.properties.spring.json.add.type.headers=false
    

    Or you can add type mapping to the inbound message converter to map the source type to the destination type.

    EDIT

    Having said that, what version are you using?