I did some experiments with spring cloud streaming where kafka is data source.
It was required to configure dlq
and my configuration don't support it.
So I switched on open source example and it really works.
spring.cloud.stream.kafka.streams.binder:
configuration:
commit.interval.ms: 1000
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.apache.kafka.common.serialization.Serdes$IntegerSerde
application.id: dlq-demo-sample
serdeError: sendToDlq
spring.cloud.stream.bindings.process-out-0:
destination: counts
spring.cloud.stream.bindings.process-in-0:
destination: words
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer:
dlqName: words-count-dlq
valueSerde: org.apache.kafka.common.serialization.Serdes$IntegerSerde
The example uses <spring-cloud.version>2020.0.2</spring-cloud.version>
and
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.4</version>
<relativePath/>
</parent>
I upgraded this libraries to newer versions: <spring-cloud.version>2023.0.0</spring-cloud.version>
and
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.2.2</version>
<relativePath/>
</parent>
After updating versions the usual scenario works as expected (in case the value serde is StringSerde
), but DLQ configuration ignored (using original yml
).
Do you think that I updated example to incompatible libraries?
...following spring documentation its should be compatible:
|Release Train | Spring Boot Generation |
|---------------------------------------------|
|2023.0.x aka Leyton | 3.2.x |
...
Do you think I missed some important properties to make it working with newer spring versions?
serde-error
in your configuration (serdeError: sendToDlq
) has been deprecated for a while and removed from the binder in the 4.0.x
version. This is in favor of deserialization-exception-handler
. See the ref docs for details: https://docs.spring.io/spring-cloud-stream/reference/kafka/kafka-streams-binder/error-handling.html
Can you use that new property and try again? We will update the sample you referenced above to reflect the new configuration.