javaapache-kafkaspring-kafka

Kafka deserializing key/value exception: log the original incoming message


I am learning how Kafka handles exceptions. I have a message consumer that expects JSON incoming data in order to the JsonDeserializer can do its job properly.

If I send an invalid string content to the Kafka topic then the following deserialization error appears:

Error deserializing key/value for partition aaa.bbb.response-0 at offset 2.
If needed, please seek past the record to continue consumption.

That is great. I catch it and log it this way:

@Bean
public ConsumerFactory<String, EventEnvelop> consumerConfigs() {
    Map<String, Object> configs = new HashMap<>();
    configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddresses);
    configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
    configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    log.debug("kafka consumer bootstrap addresses: {}", bootstrapAddresses);
    configs.forEach((key, value) -> log.debug("kafka consumer configuration: {\"{}\": \"{}\"", key, value));

    return new DefaultKafkaConsumerFactory<>(configs);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, EventEnvelop> kafkaListenerFactory() {

    ConcurrentKafkaListenerContainerFactory<String, EventEnvelop> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerConfigs());
    factory.setConcurrency(1);
    factory.setBatchListener(false);
    factory.getContainerProperties().setPollTimeout(3000);
    factory.setCommonErrorHandler(new KafkaConsumerErrorHandler());
    return factory;
}

My message consumer looks like this:

@KafkaListener(
        id = "response-topic-listener",
        topics = "${app.kafka.topic.response}",
        groupId = "response-group-1",
        containerFactory = "kafkaListenerFactory")
public void listen(EventEnvelop message) {
    log.info("new incoming message: {}", message);
}

This is how I catch the serialization error:

public class KafkaConsumerErrorHandler implements CommonErrorHandler {

    @Override
    public boolean handleOne(Exception exception,
                             ConsumerRecord<?, ?> record,
                             Consumer<?, ?> consumer,
                             MessageListenerContainer container) {
        return handle(exception, consumer);
    }

    @Override
    public void handleOtherException(Exception exception,
                                     Consumer<?, ?> consumer,
                                     MessageListenerContainer container,
                                     boolean batchListener) {
        handle(exception, consumer);
    }

    private boolean handle(Exception exception, Consumer<?,?> consumer) {
        if (exception instanceof RecordDeserializationException e) {
            log.debug("Incoming message: {}", getIncomingMessage(...); --> ??????
            log.error("Unable to parse the incoming record. {}", e.getMessage());
            consumer.seek(e.topicPartition(), e.offset() + 1L);
            consumer.commitSync();
        } else {
            log.error("An unexpected error occurred while trying to handle the incoming message: ", exception);
        }
        return false;
    }
}

I want to log the original incoming message that the parser was unable to parse. I have tried searching for this, and I have also tried exploring the consumer and container objects in debug mode, but no luck.

+1

My code always calls the handleOtherException and the handleOne method is never called.

+2

The message order in important for me so I use setConcurrency(1) and setBatchListener(false) on my ConcurrentKafkaListenerContainerFactory configuration.

Is that possible to get somehow the original incoming message? How to implement the getIncomingMessage(...) method in my code?


Solution

  • When a deserializer fails to deserialize a message, Spring has no way to handle the problem, because it occurs before the poll() returns. To solve this problem, the ErrorHandlingDeserializer has been introduced. This deserializer delegates to a real deserializer (key or value). If the delegate fails to deserialize the record content, the ErrorHandlingDeserializer returns a null value and a DeserializationException in a header that contains the cause and the raw bytes.

    In your case handling the above exception will give you the raw bytes of the failing record.

    Please check here for details and examples: https://docs.spring.io/spring-kafka/reference/kafka/serdes.html#error-handling-deserializer