spring-cloud-streamspring-cloud-stream-binderspring-cloud-stream-binder-kinesis

Batch processing error encountered after upgrade to Spring Cloud Stream Binder for Kinesis 4.0.0


We are encountering the following exception after upgrading our batch processor using Spring Cloud Stream Binder for Kinesis version 4.0.0. Note that we've been using version 2.2.0 of the binder until now. We're also using Spring Cloud version 2023.0.0 and Spring Boot version 3.2.2.

java.lang.ClassCastException: class org.springframework.messaging.support.GenericMessage cannot be cast to class [B (org.springframework.messaging.support.GenericMessage is in unnamed module of loader org.springframework.boot.loader.launch.LaunchedClassLoader @60addb54; [B is in module java.base of loader 'bootstrap')

Our consumer bean is defined as follows:

@Bean
public Consumer<Message<List<byte[]>>> batchConsumer(
    BatchItemProcessor batchItemProcessor) {
  return batchItemProcessor::consume;
}

Our properties are defined as follows:

spring:
  cloud:
    stream:
      bindings:
        batchItemProcessor-in-0:
          consumer:
            back-off-initial-interval: 1000
            back-off-max-interval: 10000
            back-off-multiplier: 2.0
            batch-mode: true
            max-attempts: 5
        use-native-decoding: true
          content-type: application/octet-stream
          destination: batchItems
      kinesis:
        binder:
          headers:
            - traceparent
          kpl-kcl-enabled: true
        bindings:
          batchItemProcessor-in-0:
            consumer:
              checkpoint-mode: batch
              listener-mode: batch

Did anything change from the older 2.2.0 release of the binder that would cause our code to fail with this error? Are we missing any required properties, or is our consumer code specified incorrectly in order to process messages in batch mode?


Solution

  • The KclMessageDrivenChannelAdapter has always returned a List<Message<Object>> based on the deserialized result from the Kinesis record and embedded headers. Apparently something has been changed in Spring Cloud Function in between.

    To fix the problem you have to change your Consumer to this:

    Consumer<Message<List<Message<byte[]>>>>