I use spring-cloud-stream-binder-kafka
and spring-cloud-stream
to configure kafka streaming in functional way.
My cloud dependencies are from
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>2023.0.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
the main dependencies are
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
</dependency>
My yml configuration:
kafka-conf:
# server: localhost:9101
server: localhost:9092
group: test-streams
consume-topic-report-details: report_details_topic
report-details-dlq: report_details_dlq
produce-topic-report-details: report_details_topic_redirect
schema: http://localhost:8081
spring:
application:
name: ${kafka-conf.group}-streaming
cloud:
function:
definition: reportDetails
stream:
bindings:
reportDetails-in-0:
contentType: application/*+avro
destination: ${kafka-conf.consume-topic-report-details}
group: ${kafka-conf.group}-streaming
reportDetails-out-0:
contentType: application/*+avro
destination: ${kafka-conf.produce-topic-report-details}
kafka:
streams:
binder:
deserialization-exception-handler: sendToDlq
configuration:
commit.interval.ms: 100
default:
key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
value.serde: io.confluent.kafka.streams.serdes.avro.GenericAvroSerde
bindings:
reportDetails-in-0:
consumer:
dlqName: ${kafka-conf.report-details-dlq}
binder:
brokers: ${kafka-conf.server}
schemaRegistryClient:
endpoint: ${kafka-conf.schema}
The consumed messages Serialised with io.confluent.kafka.serializers.KafkaAvroSerializer
.
I expect that my service automatically use io.confluent.kafka.serializers.KafkaAvroDeserializer
for every message in stream, but it seems something is ignored in my yml
configuration.
and as result my steam fails
@Bean
Function<ReportDetails, ReportDetails> reportDetails() {
return data -> {
log.info("input reportDetails: {}", data);
return data;
};
}
with exception
Caused by: java.lang.ClassCastException: class [B cannot be cast to class com.vl.model.avro.ReportDetails ([B is in module java.base of loader 'bootstrap'; com.vl.model.avro.ReportDetails is in unnamed module of loader 'app')
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.invokeFunctionAndEnrichResultIfNecessary(SimpleFunctionRegistry.java:958)
On other hand I tried deserialise it directly (it works).
@Bean
Function<byte[], byte[]> filterAbsence() {
return dto -> {
SchemaRegistryClient schemaRegistryClient = new CachedSchemaRegistryClient("http://schema-server:8081", 5);
try (KafkaAvroDeserializer deserializer = new KafkaAvroDeserializer(schemaRegistryClient)) {
AbsenceDto obj = deserializer.deserialize("report_details_topic", dto);
log.info("received message: {}", obj);
}
log.info("received message: {}", dto);
return dto;
};
}
I suspect that my DQL configured incorrectly also. When my consumer fails I expect that messages will be redirected to DLQ, but one is empty.
yml
configuration to make deserialising working automatically?From the dependencies you show above and the YAML configuration, it is implied that you are using Kafka Streams. However, the code you shared does not use the Kafka Streams binder in Spring Cloud Stream. It uses the regular Kafka binder built on the Kafka client libraries (regular producer/consumer). First of all, we need to make sure that this assumption is true. For example, this code does not have anything specific to Kafka Streams but a regular Java function that will be intercepted by the regular Kafka binder (spring-cloud-stream-binder-kafka
in your dependency).
@Bean
Function<ReportDetails, ReportDetails> reportDetails() {
return data -> {
log.info("input reportDetails: {}", data);
return data;
};
}
If that assumption is true, you can remove your configuration under spring.cloud.stream.kafka.streams.binder
and use spring.cloud.stream.kafka.binder
. The remainder of this answer is under this assumption.
Since you are using Avro, you must provide an appropriate de/serializer -- either a message converter or a Kafka de/serializer. If you'd like to use the first option by providing a message converter, please take a look at this issue for some insights.
If you go with the second option, you must enable native de/serialization on Kafka by turning the nativeEncoding
for the producer and nativeDecoding
for the consumer from Spring Cloud Stream. In this case, you also need to provide a proper Avro Kafka de/serializer. You can use the one from Confluent for that - kafka-avro-serializer
. Once you add this dependency, you need to make the corresponding configuration changes. The following is something that I can infer from your original configuration.
spring:
application:
name: ${kafka-conf.group}-streaming
cloud:
function:
definition: reportDetails
stream:
bindings:
reportDetails-in-0:
destination: ${kafka-conf.consume-topic-report-details}
group: ${kafka-conf.group}-streaming
consumer:
use-native-decoding: true
reportDetails-out-0:
destination: ${kafka-conf.produce-topic-report-details}
producer:
use-native-encoding: true
kafka:
bindings:
reportDetails-in-0:
consumer:
configuration:
schema.registry.url: http://localhost:8081
value.dserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
reportDetails-out-0:
producer:
configuration:
schema.registry.url: http://localhost:8081
value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
binder:
brokers: ${kafka-conf.server}
Pay attention to the way the YAML configuration hierarchy is changed around. First of all, the Kafka Streams binder properties are gone now. Then we set nativeEncoding/decoding on the core bindings. Note that there is no longer need to set contentType
since we are using native de/serialization on Kafka. Then, specifically, on kafka bindings (..kafka.bindings..
), we set Kafka specific consumer/producer configuration (such as schema.registry.url
, value.deserializer
, value.serializer
, etc.).
This should work. If not, please feel free to provide a minimal standalone sample, and we can triage it further.
Here is a sample that uses both Kafka and Kafka Streams binders with Avro that you may want to use as a reference: https://github.com/spring-cloud/spring-cloud-stream-samples/tree/main/kafka-streams-samples/kafka-streams-avro. See the configuration in that app in particular.