I'm developing a Spring Cloud Stream app that accepts messages in Avro format. After setting up everything, I start my app, then get an Unknown magic byte error. I'm thinking this is because I'm using SpecificAvroSerde.
Here's my code:
application.yml:
spring:
cloud:
function:
definition: employeeSalaryUpdateFlow
stream:
bindings:
employeeSalaryUpdateFlow-in-0:
destination: employee-topic
kafka:
streams:
binder:
applicationId: kafka-streams-app
brokers: localhost:9092
configuration:
schema.registry.url: http://localhost:8081
processing.guarantee: exactly_once
commit.interval.ms: 10000
default:
key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
bindings:
employeeSalaryUpdateFlow-in-0:
consumer:
materializedAs: employeeSalaryUpdateFlow-store
value-serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
Processor.java
@Bean
public Consumer<KStream<String, Employee>> employeeSalaryUpdateFlow() {
SpecificAvroSerde<Employee> avroSerde = new SpecificAvroSerde<>();
Map<String, String> serdeConfig = new HashMap<>();
serdeConfig.put("schema.registry.url", "http://localhost:8081"); // Set your schema registry URL
avroSerde.configure(serdeConfig, false);
return input -> {
input
.map((k, v) -> {
log.info("Hey");
return KeyValue.pair(v.getId(), v);
})
.toTable()
.groupBy((k, v) -> KeyValue.pair(v.getDepartment(), v), Grouped.with(Serdes.String(), avroSerde))
.aggregate(() -> employeeSalaryRecordBuilder.init(),
(k, v, agg) -> employeeSalaryRecordBuilder.aggregate(v, agg),
(k, v, agg) -> employeeSalaryRecordBuilder.substract(v, agg))
.toStream()
.foreach((k, v) -> log.info(String.format("Department: %s, Total Salary: %f", k, v.getTotalSalary())));
};
}
Employee.avro
{
"namespace": "me.model",
"type": "record",
"name": "Employee",
"fields": [
{"name": "id","type": ["null","string"]},
{"name": "name","type": ["null","string"]},
{"name": "department","type": ["null","string"]},
{"name": "salary","type": ["null","int"]}
]
}
I found the answer. Basically, the error message is quite misleading because what I'm missing is the Schema Registry service. You can use my setup to run the whole cluster
docker-compose.yaml
version: '3'
services:
zoo1:
image: confluentinc/cp-zookeeper:7.5.3
hostname: zoo1
container_name: zoo1
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_SERVERS: zoo1:2888:3888
kafka1:
image: confluentinc/cp-kafka:7.5.3
hostname: kafka1
container_name: kafka1
ports:
- "9092:9092"
- "29092:29092"
environment:
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092,DOCKER://host.docker.internal:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
KAFKA_BROKER_ID: 1
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
depends_on:
- zoo1
kafka2:
image: confluentinc/cp-kafka:7.5.3
hostname: kafka2
container_name: kafka2
ports:
- "9093:9093"
- "29093:29093"
environment:
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka2:19093,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9093,DOCKER://host.docker.internal:29093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
KAFKA_BROKER_ID: 2
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
depends_on:
- zoo1
kafka3:
image: confluentinc/cp-kafka:7.5.3
hostname: kafka3
container_name: kafka3
ports:
- "9094:9094"
- "29094:29094"
environment:
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka3:19094,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9094,DOCKER://host.docker.internal:29094
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
KAFKA_BROKER_ID: 3
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
depends_on:
- zoo1
schema-registry:
image: confluentinc/cp-schema-registry:7.5.3
hostname: schema-registry
container_name: schema-registry
ports:
- "8081:8081"
depends_on:
- zoo1
- kafka1
- kafka2
- kafka3
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka1:19092,kafka2:19093,kafka3:19094
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
After running Schema Registry, my application start to consume message.
Bonus: I've found that there are too few examples on how to develop a Spring Cloud Stream application with aggregation, so I'll but my code snippet here in case someone need it.
Note that I'm using kafka-avro-serializer and kafka-streams-avro-serde version 7.5.9 together with kafka-streams 3.9.1.
DepartmentAggregate.avro
{
"namespace": "vin.kafka_stream_spring_cloud.kafka.model",
"type": "record",
"name": "DepartmentAggregate",
"fields": [
{"name": "total_salary","type": ["null","int"]},
{"name": "employee_count","type": ["null","int"]},
{"name": "avg_salary","type": ["null","double"]}
]
}
Processor.java
@Bean
public Consumer<KStream<String, Employee>> employeeSalaryUpdateFlow() {
SpecificAvroSerde<Employee> employeeAvroSerde = new SpecificAvroSerde<>();
Map<String, String> employeeSerdeConfig = new HashMap<>();
employeeSerdeConfig.put("schema.registry.url", "http://localhost:8081"); // Set your schema registry URL
employeeAvroSerde.configure(employeeSerdeConfig, false);
SpecificAvroSerde<DepartmentAggregate> departmentAggregateAvroSerde = new SpecificAvroSerde<>();
Map<String, String> departmentAggregateserdeConfig = new HashMap<>();
departmentAggregateserdeConfig.put("schema.registry.url", "http://localhost:8081"); // Set your schema registry URL
departmentAggregateAvroSerde.configure(departmentAggregateserdeConfig, false);
KeyValueBytesStoreSupplier storeSupplier = Stores.persistentKeyValueStore("employeeSalaryUpdateFlow-store");
Materialized<String, Employee, KeyValueStore<Bytes, byte[]>> materialized = Materialized.<String, Employee>as(storeSupplier).withKeySerde(Serdes.String()).withValueSerde(employeeAvroSerde);
return input -> {
input
.map((k, v) -> KeyValue.pair(v.getId(), v))
.toTable(Materialized.with(Serdes.String(), employeeAvroSerde))
.groupBy((k, v) -> KeyValue.pair(v.getDepartment(), v), Grouped.with(Serdes.String(), employeeAvroSerde))
.aggregate(() -> employeeSalaryRecordBuilder.init(),
(k, v, agg) -> employeeSalaryRecordBuilder.aggregate(v, agg),
(k, v, agg) -> employeeSalaryRecordBuilder.substract(v, agg),
Materialized.with(Serdes.String(), departmentAggregateAvroSerde))
.toStream()
.foreach((k, v) -> log.info(String.format("Department: %s, Total Salary: %s", k, v.getTotalSalary())));
};
}
Note: If you use IntelliJ Kafka plugin to produce an Avro message, there is a chance that the consumer won't be able to read your message. Instead, use another Spring Cloud Producer application to produce messages using the same Avro schema as your consumer app (in my case - the Employee.avro).