javaspring-bootapache-kafka-streamsspring-cloud-stream

Unknown magic byte with spring cloud stream aggregation


I'm developing a Spring Cloud Stream app that accepts messages in Avro format. After setting up everything, I start my app, then get an Unknown magic byte error. I'm thinking this is because I'm using SpecificAvroSerde.

Here's my code:

application.yml:

spring:
  cloud:
    function:
      definition: employeeSalaryUpdateFlow
    stream:
      bindings:
        employeeSalaryUpdateFlow-in-0:
          destination: employee-topic
      kafka:
        streams:
          binder:
            applicationId: kafka-streams-app
            brokers: localhost:9092
            configuration:
              schema.registry.url: http://localhost:8081
              processing.guarantee: exactly_once
              commit.interval.ms: 10000
              default:
                key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
          bindings:
            employeeSalaryUpdateFlow-in-0:
              consumer:
                materializedAs: employeeSalaryUpdateFlow-store
                value-serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde

Processor.java

@Bean
public Consumer<KStream<String, Employee>> employeeSalaryUpdateFlow() {
    SpecificAvroSerde<Employee> avroSerde = new SpecificAvroSerde<>();
    Map<String, String> serdeConfig = new HashMap<>();
    serdeConfig.put("schema.registry.url", "http://localhost:8081"); // Set your schema registry URL
    avroSerde.configure(serdeConfig, false);
    return input -> {
        input
                .map((k, v) -> {
                    log.info("Hey");
                    return KeyValue.pair(v.getId(), v);
                })
                .toTable()
                .groupBy((k, v) -> KeyValue.pair(v.getDepartment(), v), Grouped.with(Serdes.String(), avroSerde))
                .aggregate(() -> employeeSalaryRecordBuilder.init(),
                        (k, v, agg) -> employeeSalaryRecordBuilder.aggregate(v, agg),
                        (k, v, agg) -> employeeSalaryRecordBuilder.substract(v, agg))
                .toStream()
                .foreach((k, v) -> log.info(String.format("Department: %s, Total Salary: %f", k, v.getTotalSalary())));
    };
}

Employee.avro

{
  "namespace": "me.model",
  "type": "record",
  "name": "Employee",
  "fields": [
    {"name": "id","type": ["null","string"]},
    {"name": "name","type": ["null","string"]},
    {"name": "department","type": ["null","string"]},
    {"name": "salary","type": ["null","int"]}
  ]
}

Solution

  • I found the answer. Basically, the error message is quite misleading because what I'm missing is the Schema Registry service. You can use my setup to run the whole cluster

    docker-compose.yaml

    version: '3'
    
    services:
      zoo1:
        image: confluentinc/cp-zookeeper:7.5.3
        hostname: zoo1
        container_name: zoo1
        ports:
          - "2181:2181"
        environment:
          ZOOKEEPER_CLIENT_PORT: 2181
          ZOOKEEPER_SERVER_ID: 1
          ZOOKEEPER_SERVERS: zoo1:2888:3888
    
      kafka1:
        image: confluentinc/cp-kafka:7.5.3
        hostname: kafka1
        container_name: kafka1
        ports:
          - "9092:9092"
          - "29092:29092"
        environment:
          KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092,DOCKER://host.docker.internal:29092
          KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
          KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
          KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
          KAFKA_BROKER_ID: 1
          KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
          KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
          KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
        depends_on:
          - zoo1
    
      kafka2:
        image: confluentinc/cp-kafka:7.5.3
        hostname: kafka2
        container_name: kafka2
        ports:
          - "9093:9093"
          - "29093:29093"
        environment:
          KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka2:19093,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9093,DOCKER://host.docker.internal:29093
          KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
          KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
          KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
          KAFKA_BROKER_ID: 2
          KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
          KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
          KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
        depends_on:
          - zoo1
    
      kafka3:
        image: confluentinc/cp-kafka:7.5.3
        hostname: kafka3
        container_name: kafka3
        ports:
          - "9094:9094"
          - "29094:29094"
        environment:
          KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka3:19094,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9094,DOCKER://host.docker.internal:29094
          KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
          KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
          KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
          KAFKA_BROKER_ID: 3
          KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
          KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
          KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
        depends_on:
          - zoo1
      schema-registry:
        image: confluentinc/cp-schema-registry:7.5.3
        hostname: schema-registry
        container_name: schema-registry
        ports:
          - "8081:8081"
        depends_on:
          - zoo1
          - kafka1
          - kafka2
          - kafka3
        environment:
          SCHEMA_REGISTRY_HOST_NAME: schema-registry
          SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka1:19092,kafka2:19093,kafka3:19094
          SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
    

    After running Schema Registry, my application start to consume message.

    Bonus: I've found that there are too few examples on how to develop a Spring Cloud Stream application with aggregation, so I'll but my code snippet here in case someone need it.

    Note that I'm using kafka-avro-serializer and kafka-streams-avro-serde version 7.5.9 together with kafka-streams 3.9.1.
    DepartmentAggregate.avro

    {
      "namespace": "vin.kafka_stream_spring_cloud.kafka.model",
      "type": "record",
      "name": "DepartmentAggregate",
      "fields": [
        {"name": "total_salary","type": ["null","int"]},
        {"name": "employee_count","type": ["null","int"]},
        {"name": "avg_salary","type": ["null","double"]}
      ]
    }
    

    Processor.java

    @Bean
    public Consumer<KStream<String, Employee>> employeeSalaryUpdateFlow() {
        SpecificAvroSerde<Employee> employeeAvroSerde = new SpecificAvroSerde<>();
        Map<String, String> employeeSerdeConfig = new HashMap<>();
        employeeSerdeConfig.put("schema.registry.url", "http://localhost:8081"); // Set your schema registry URL
        employeeAvroSerde.configure(employeeSerdeConfig, false);
    
        SpecificAvroSerde<DepartmentAggregate> departmentAggregateAvroSerde = new SpecificAvroSerde<>();
        Map<String, String> departmentAggregateserdeConfig = new HashMap<>();
        departmentAggregateserdeConfig.put("schema.registry.url", "http://localhost:8081"); // Set your schema registry URL
        departmentAggregateAvroSerde.configure(departmentAggregateserdeConfig, false);
    
        KeyValueBytesStoreSupplier storeSupplier = Stores.persistentKeyValueStore("employeeSalaryUpdateFlow-store");
        Materialized<String, Employee, KeyValueStore<Bytes, byte[]>> materialized = Materialized.<String, Employee>as(storeSupplier).withKeySerde(Serdes.String()).withValueSerde(employeeAvroSerde);
    
        return input -> {
            input
                    .map((k, v) -> KeyValue.pair(v.getId(), v))
                    .toTable(Materialized.with(Serdes.String(), employeeAvroSerde))
                    .groupBy((k, v) -> KeyValue.pair(v.getDepartment(), v), Grouped.with(Serdes.String(), employeeAvroSerde))
                    .aggregate(() -> employeeSalaryRecordBuilder.init(),
                            (k, v, agg) -> employeeSalaryRecordBuilder.aggregate(v, agg),
                            (k, v, agg) -> employeeSalaryRecordBuilder.substract(v, agg),
                            Materialized.with(Serdes.String(), departmentAggregateAvroSerde))
                    .toStream()
                    .foreach((k, v) -> log.info(String.format("Department: %s, Total Salary: %s", k, v.getTotalSalary())));
        };
    }
    

    Note: If you use IntelliJ Kafka plugin to produce an Avro message, there is a chance that the consumer won't be able to read your message. Instead, use another Spring Cloud Producer application to produce messages using the same Avro schema as your consumer app (in my case - the Employee.avro).