We are encountering the following exception after upgrading our batch processor using Spring Cloud Stream Binder for Kinesis
version 4.0.0
. Note that we've been using version 2.2.0
of the binder until now. We're also using Spring Cloud
version 2023.0.0
and Spring Boot
version 3.2.2
.
java.lang.ClassCastException: class org.springframework.messaging.support.GenericMessage cannot be cast to class [B (org.springframework.messaging.support.GenericMessage is in unnamed module of loader org.springframework.boot.loader.launch.LaunchedClassLoader @60addb54; [B is in module java.base of loader 'bootstrap')
Our consumer bean is defined as follows:
@Bean
public Consumer<Message<List<byte[]>>> batchConsumer(
BatchItemProcessor batchItemProcessor) {
return batchItemProcessor::consume;
}
Our properties are defined as follows:
spring:
cloud:
stream:
bindings:
batchItemProcessor-in-0:
consumer:
back-off-initial-interval: 1000
back-off-max-interval: 10000
back-off-multiplier: 2.0
batch-mode: true
max-attempts: 5
use-native-decoding: true
content-type: application/octet-stream
destination: batchItems
kinesis:
binder:
headers:
- traceparent
kpl-kcl-enabled: true
bindings:
batchItemProcessor-in-0:
consumer:
checkpoint-mode: batch
listener-mode: batch
Did anything change from the older 2.2.0
release of the binder that would cause our code to fail with this error? Are we missing any required properties, or is our consumer code specified incorrectly in order to process messages in batch mode?
The KclMessageDrivenChannelAdapter
has always returned a List<Message<Object>>
based on the deserialized result from the Kinesis record and embedded headers. Apparently something has been changed in Spring Cloud Function in between.
To fix the problem you have to change your Consumer
to this:
Consumer<Message<List<Message<byte[]>>>>