javaspringapache-kafkaspring-kafkaapache-kafka-streams

Kafka streams correctly handles message but throws Deserialization exception


I am trying out kafka streams for the first time and I am getting a behaviour I can't quite understand. I have a producer sending a message on an input topic in the format:

{"fooId": "Bar", "timestamp": 1717764162000}

this should be consumed and a message produced to an output topic saying "Bar: Foo is online".

Now, when the first message is processed I do see the output message on the kafka output topic and I am able to consume it. However, my application throws a deserialization exception and than stops working. My serdes look alright to me, I can't see what I am doing wrong. Here's my code:

kafka config:

@EnableKafka
@EnableKafkaStreams
public class KafkaConfig {

    @Value(value = "${spring.kafka.bootstrap-servers}")
    private String bootstrapAddress;

    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    KafkaStreamsConfiguration kStreamsConfig(){
        Map<String, Object> props = new HashMap<>();
        props.put(APPLICATION_ID_CONFIG, "streams-app");
        props.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
//        props.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, HEARTBEAT_SERDE.getClass().getName());
        props.put(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class.getName());
        props.put(AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(CACHE_MAX_BYTES_BUFFERING_CONFIG, "0");

        return new KafkaStreamsConfiguration(props);
    }
}

FooHeartbeat

public class FooHeartbeat {
    private String fooId;
    private long timestamp;

    public String getFooId() {
        return fooId;
    }

    public long getTimestamp() {
        return timestamp;
    }

    public void setFooId(String fooId) {
        this.fooId = fooId;
    }

    public void setTimestamp(long timestamp) {
        this.timestamp = timestamp;
    }
}

HeartBeat processor

@Component 
public class HeartbeatProcessor { Logger logger = LoggerFactory.getLogger(HeartbeatProcessor.class);
    private static final Serde<String> STRING_SERDE = Serdes.String();
    public static final Serde<FooHeartbeat> HEARTBEAT_SERDE = new JsonSerde<>(FooHeartbeat.class);
    
    private String inputTopic = "heartbeats";
    private String outputTopic = "foo-status";
    private static final Duration OFFLINE_THRESHOLD = Duration.ofMinutes(2);
    


    @Autowired
    private StreamsBuilder streamsBuilder;
    
    @Autowired
    void buildPipeline(StreamsBuilder streamsBuilder){
    
        KStream<String, FooHeartbeat> messageStream = streamsBuilder
                .stream(inputTopic, Consumed.with(STRING_SERDE, HEARTBEAT_SERDE));
    
    
    
        KStream<String, String> outputStream = messageStream
                .map((key, fooHeartbeat)->{
                    logger.info("heartbeat: {}, with timestamp: {}", fooHeartbeat.getFooId(), fooHeartbeat.getTimestamp());
                    return KeyValue.pair(fooHeartbeat.getFooId(), "Foo: is online");
                });
    
        outputStream.to(outputTopic, Produced.with(STRING_SERDE, STRING_SERDE));;
    

}

Finally, here's the exception:


org.apache.kafka.streams.errors.StreamsException: ClassCastException invoking processor: KSTREAM-MAPVALUES-0000000004. Do the Processor's input types match the deserialized types? Check the Serde setup and change the default Serdes in StreamConfig or provide correct Serdes via method parameters. Make sure the Processor can accept the deserialized input of type key: unknown because key is null, and value: com.domotzland.clockdude.model.FooHeartbeat.
Note that although incorrect Serdes are a common cause of error, the cast exception might have another cause (in user code, for example). For example, if a processor wires in a store, but casts the generics incorrectly, a class cast exception could be raised during processing, but the cause would not be wrong Serdes.
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:165) ~[kafka-streams-3.6.2.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290) ~[kafka-streams-3.6.2.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269) ~[kafka-streams-3.6.2.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228) ~[kafka-streams-3.6.2.jar:na]
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84) ~[kafka-streams-3.6.2.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:810) ~[kafka-streams-3.6.2.jar:na]
    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872) ~[kafka-streams-3.6.2.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:810) ~[kafka-streams-3.6.2.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:741) ~[kafka-streams-3.6.2.jar:na]
    at org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:97) ~[kafka-streams-3.6.2.jar:na]
    at org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:78) ~[kafka-streams-3.6.2.jar:na]
    at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1765) ~[kafka-streams-3.6.2.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:807) ~[kafka-streams-3.6.2.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617) ~[kafka-streams-3.6.2.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579) ~[kafka-streams-3.6.2.jar:na]
Caused by: java.lang.ClassCastException: class com.domotzland.clockdude.model.FooHeartbeat cannot be cast to class java.lang.String (com.domotzland.clockdude.model.FooHeartbeat is in unnamed module of loader 'app'; java.lang.String is in module java.base of loader 'bootstrap')
    at org.apache.kafka.streams.kstream.internals.AbstractStream.lambda$withKey$2(AbstractStream.java:111) ~[kafka-streams-3.6.2.jar:na]
    at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41) ~[kafka-streams-3.6.2.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:154) ~[kafka-streams-3.6.2.jar:na]
    ... 14 common frames omitted

2024-06-14T11:02:22.011+02:00 ERROR 57204 --- [clockdude] [-StreamThread-1] org.apache.kafka.streams.KafkaStreams    : stream-client [streams-app-c7912c78-c858-47b1-b367-a62289f5986e] Encountered the following exception during processing and the registered exception handler opted to SHUTDOWN_CLIENT. The streams client is going to shut down now. 

org.apache.kafka.streams.errors.StreamsException: ClassCastException invoking processor: KSTREAM-MAPVALUES-0000000004. Do the Processor's input types match the deserialized types? Check the Serde setup and change the default Serdes in StreamConfig or provide correct Serdes via method parameters. Make sure the Processor can accept the deserialized input of type key: unknown because key is null, and value: com.domotzland.clockdude.model.FooHeartbeat.
Note that although incorrect Serdes are a common cause of error, the cast exception might have another cause (in user code, for example). For example, if a processor wires in a store, but casts the generics incorrectly, a class cast exception could be raised during processing, but the cause would not be wrong Serdes.
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:165) ~[kafka-streams-3.6.2.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290) ~[kafka-streams-3.6.2.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269) ~[kafka-streams-3.6.2.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228) ~[kafka-streams-3.6.2.jar:na]
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84) ~[kafka-streams-3.6.2.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:810) ~[kafka-streams-3.6.2.jar:na]
    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872) ~[kafka-streams-3.6.2.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:810) ~[kafka-streams-3.6.2.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:741) ~[kafka-streams-3.6.2.jar:na]
    at org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:97) ~[kafka-streams-3.6.2.jar:na]
    at org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:78) ~[kafka-streams-3.6.2.jar:na]
    at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1765) ~[kafka-streams-3.6.2.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:807) ~[kafka-streams-3.6.2.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617) ~[kafka-streams-3.6.2.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579) ~[kafka-streams-3.6.2.jar:na]
Caused by: java.lang.ClassCastException: class com.domotzland.clockdude.model.FooHeartbeat cannot be cast to class java.lang.String (com.domotzland.clockdude.model.FooHeartbeat is in unnamed module of loader 'app'; java.lang.String is in module java.base of loader 'bootstrap')
    at org.apache.kafka.streams.kstream.internals.AbstractStream.lambda$withKey$2(AbstractStream.java:111) ~[kafka-streams-3.6.2.jar:na]
    at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41) ~[kafka-streams-3.6.2.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:154) ~[kafka-streams-3.6.2.jar:na]
    ... 14 common frames omitted

I've trying playing around with the serialization. It must be something related to the conversion from string to heartbeat and with the heartbeat serde. If I switch the input to a string and use a String serde everything seems to work just fine. Again, mind that for the first consumed message the output is produced correctly. Then the application stops after this exception with:

  2024-06-14T11:02:22.522+02:00  INFO 57204 --- [clockdude] [86e-CloseThread] org.apache.kafka.streams.KafkaStreams    : stream-client [streams-app-c7912c78-c858-47b1-b367-a62289f5986e] State transition from PENDING_ERROR to ERROR

This is driving me crazy, any help would be very much appreciated! Thanks!!


Solution

  • Solved it!

    There was nothing wrong with the code, it works as it is. I was just dumb enough to have a separate pipeline in my code that was using the same topics. I had previously created this one as an example and I forgot to delete it or comment it out. This pipeline was expecting a string in the inputTopic, hence the serialization error.

    This also explains why I was getting the correct output AND the exception.

    @Matthias J. Sax thanks a lot for the reply