spring-cloud-streamspring-integration-aws

SerializationFailedException: Spring Cloud Stream Kinesis binder with kpl-kcl-enabled:true


I am currently evaluating the possibility of using spring cloud stream kinesis binder in a new project but I am having some issues.

When I have kcl-kpl-enabled: false everything works fine. However when I have kcl-kop-enabled I keep getting the following error:

org.springframework.core.serializer.support.SerializationFailedException: Failed to deserialize payload. Is the byte array a result of corresponding serialization for DefaultDeserializer?; nested exception is java.io.StreamCorruptedException: invalid stream header: 61686868

This is my current configuration:

spring:
  cloud:
    stream:
      kinesis:
        binder:
          checkpoint:
            create-delay: 0
            table: feeder_mycollection_changes_table
          kpl-kcl-enabled: true
        bindings:
          processEvent-in-0:
            consumer:
              shardIteratorType: TRIM_HORIZON
      bindings:
        processEvent-in-0:
          destination: mycollection_changes_stream
          content-type: application/json
          consumer:
            headerMode: none

The versions of the dependencies that I am using on my tests are:

<spring-cloud.version>Hoxton.RC2</spring-cloud.version>
<spring-cloud-stream.version>Horsham.RC2</spring-cloud-stream.version>
<spring-cloud-stream-kinesis.version>2.0.0.BUILD-SNAPSHOT</spring-cloud-stream-kinesis.version>

Solution

  • The fix is here: https://github.com/spring-cloud/spring-cloud-stream-binder-aws-kinesis/commit/b64cd10c5b5aac209b61399f81e2801f24fbbaf4

    The problem is that a default converter in the KclMessageDrivenChannelAdapter is a DeserializingConverter.

    Spring Cloud Stream doesn't deal with Java serialization and it has its own mechanism to convert byte[]. So, we needed to fix a Binder implementation to rely on the Spring Cloud Stream conversion infrastructure.