javaspring-bootapache-kafkaspring-kafkaspring-cloud-stream-binder-kafka

Spring boot 3.2.9 Cloud stream - Kafka binder. classcastexception - class java.lang.Byte cannot be cast to class java.util.UUID


I created an application based on spring example - batch-producer-consumer

There are a few changes done to the example - producer and consumer/processor are different applications.

The processor is receiving the events, but getting class cast exception. Not sure why it is throwing error at line -> toList() - (CloudstreamKafkaconsumerApplication.java:34)

The source https://github.com/kswat/cloudstream-kafka

https://github.com/kswat/cloudstream-kafkaconsumer

Note: I have a simple docker-compose that spins zookeeper and brokers ( standard setup )

Stacktrace

Caused by: java.lang.ClassCastException: class java.lang.Byte cannot be cast to class java.util.UUID (java.lang.Byte and java.util.UUID are in module java.base of loader 'bootstrap')
at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197) ~[na:na]
at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1625) ~[na:na]
at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509) ~[na:na]
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) ~[na:na]
at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:575) ~[na:na]
at java.base/java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260) ~[na:na]
at java.base/java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:616) ~[na:na]
at java.base/java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:622) ~[na:na]
at java.base/java.util.stream.ReferencePipeline.toList(ReferencePipeline.java:627) ~[na:na]
at com.example.CloudstreamKafkaconsumerApplication.lambda$sanitizingConsumer$2(CloudstreamKafkaconsumerApplication.java:34) ~[classes/:na]
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.invokeFunctionAndEnrichResultIfNecessary(SimpleFunctionRegistry.java:958) ~[spring-cloud-function-context-4.1.0.jar:4.1.0]
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.invokeFunction(SimpleFunctionRegistry.java:904) ~[spring-cloud-function-context-4.1.0.jar:4.1.0]
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.doApply(SimpleFunctionRegistry.java:740) ~[spring-cloud-function-context-4.1.0.jar:4.1.0]
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.apply(SimpleFunctionRegistry.java:580) ~[spring-cloud-function-context-4.1.0.jar:4.1.0]
at org.springframework.cloud.stream.function.PartitionAwareFunctionWrapper.apply(PartitionAwareFunctionWrapper.java:92) ~[spring-cloud-stream-4.1.0.jar:4.1.0]
at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionWrapper.apply(FunctionConfiguration.java:832) ~[spring-cloud-stream-4.1.0.jar:4.1.0]
at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1.handleMessageInternal(FunctionConfiguration.java:661) ~[spring-cloud-stream-4.1.0.jar:4.1.0]
at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:105) ~[spring-integration-core-6.1.2.jar:6.1.2]
... 40 common frames omitted

I am using latest spring boot and cloud (almost) from start.spring.io

Is some serialization, deserialization configuration required?

How to resolve the issue please ?

Edit: For single values, it works.

enter image description here


Solution

  • This is a configuration-related issue. To resolve it, you must set the batch-mode on the consumer binding. This setting triggers the incoming payload to be wrapped around a collection, which is crucial for batch consumers.

    Try the following configuration change in your application.yaml.

    spring.cloud.stream.bindings.sanitizingConsumer-in-0.consumer.batch-mode: true.