apache-kafka-streamsspring-cloud-streamhive-serde

Error while serializing aggregate state store with custom serde on Spring Cloud Stream


I'm trying to create a simple functional bean with Spring Cloud Stream that processes messages from a KStream and a GlobalKTable, joins them, aggregates them, and outputs the result to a new stream but I'm having difficulties in configuring properly the serdes needed for it.

Without further ado, here is my method:

@Bean
public BiFunction<KStream<GenericRecord, GenericRecord>, GlobalKTable<Long, GenericRecord>, KStream<String, MyCustomJavaClass>> joinAndAggregate() {

    return (stream, table) -> stream
            .join(table,
                    (streamKey, streamValue) -> (Long) streamValue.get("something"),
                    (streamValue, tableValue) -> {
                        return new MyCustomJavaClass(streamValue, tableValue);
                    }).selectKey(((key, value) -> (Long) key.get("id")))
            .groupBy((key, value) -> value.getKey(), Grouped.with(Serdes.String(), new MyCustomSerde()))
            .aggregate(() -> {
                return new MyCustomJavaClass();
            }, (key, value, aggregatedValue) -> {
                // aggregation logic
                return new MyCustomJavaClass(aggregatedData);
            }).toStream()
            .peek((k, v) -> {
                if (v == null)
                    log.warn("No value for key:\n" + k.toString() + "\n");
                else
                    log.info("Aggregated result with key:\n" + k + "\nvalue:\n" + v.toString() + "\n");
            });
}

static public final class MyCustomSerde extends JsonSerde<MyCustomJavaClass> { }

This is the configuration in my properties file:

spring.application.name: test-application
spring.cloud.stream.kafka.binder.brokers: kafka-svc:9092
spring.kafka.properties.schema.registry.url: http://schema-registry-svc:8081
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms: 1000
spring.cloud.function.definition: joinAndAggregate
spring.cloud.stream.bindings.joinAndAggregate-in-0.destination: input-stream
spring.cloud.stream.bindings.joinAndAggregate-in-1.destination: input-global-ktable
spring.cloud.stream.bindings.joinAndAggregate-out-0.destination: aggregate-output
# Serdes
spring.cloud.stream.kafka.streams.bindings.joinAndAggregate-in-0.consumer.application-id: joinAndAggregate-in-0-v0.1.0
spring.cloud.stream.kafka.streams.bindings.joinAndAggregate-in-0.consumer.key-serde: io.confluent.kafka.streams.serdes.avro.GenericAvroSerde
spring.cloud.stream.kafka.streams.bindings.joinAndAggregate-in-0.consumer.value-serde: io.confluent.kafka.streams.serdes.avro.GenericAvroSerde
spring.cloud.stream.kafka.streams.bindings.joinAndAggregate-in-1.consumer.application-id: joinAndAggregate-in-1-v0.1.0
spring.cloud.stream.kafka.streams.bindings.joinAndAggregate-in-1.consumer.value-serde: io.confluent.kafka.streams.serdes.avro.GenericAvroSerde
spring.cloud.stream.kafka.streams.bindings.joinAndAggregate-out-0.producer.value-serde: com.package.MyClass$MyCustomSerde

When I run the code above I get the following error:

Failed to process stream task 2_0 due to the following error:
org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=2_0, processor=KSTREAM-SOURCE-0000000011, topic=joinAndAggregate-in-0-v0.1.0-KSTREAM-AGGREGATE-STATE-STORE-0000000007-repartition, partition=0, offset=0, stacktrace=org.apache.kafka.streams.errors.StreamsException: 
A serializer (org.apache.kafka.common.serialization.ByteArraySerializer) is not compatible to the actual value type (value type: com.package.model.MyCustomJavaClass). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
        at org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:204)
    ... <omitting some lines here> ...
Caused by: java.lang.ClassCastException: class com.package.model.MyCustomJavaClass cannot be cast to class [B (com.package.model.MyCustomJavaClass is in unnamed module of loader 'app'; [B is in module java.base of loader 'bootstrap')
    at org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:19)

The class com.package.model.MyCustomJavaClass resides in a different package than the MyClass where the functional stream method is defined. Could that be the problem?

I also verified that MyCustomJavaClass can be serialized and de-serialized properly using the custom serde you see above (MyCustomSerde). that is just a simple serde extending JsonSerde. I am able to process messages with values serialized with MyCustomSerde both in input and output with other functional methods that I omitted here so the serializer and the custom java class I am using are not the problem. Somehow only the aggregate state store stream has issues with my custom serde and I can't find a way to fix that by looking at examples and documentation.

What is that I am doing wrong?

Thanks in advance!


Solution

  • When you see an error like:

     serializer (org.apache.kafka.common.serialization.ByteArraySerializer) is not compatible to the actual value type (value type: com.package.model.MyCustomJavaClass). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
    

    That means Kafka Streams used a (de)serializer that didn't match the types presented. In this case, Kafka Streams used the default serializer of Serdes.ByteArraySerde. If you update your aggregate method and add a third parameter Materialized.with(Serdes.String(), new MyCustomSerde()), then your application should get past this error.

    
     .aggregate(() -> {
                    return new MyCustomJavaClass();
                }, (key, value, aggregatedValue) -> {
                    // aggregation logic
                    return new MyCustomJavaClass(aggregatedData);
                }, Materialized.with(Serdes.String(), new MyCustomSerde()))
    

    Let me know how it goes.

    -Bill