spring-bootspring-kafkaspring-cloud-streamspring-cloud-stream-binder-kafka

Spring cloud stream project, kafka DLQ configuration is ignored for Leyton version


I did some experiments with spring cloud streaming where kafka is data source. It was required to configure dlq and my configuration don't support it. So I switched on open source example and it really works.

spring.cloud.stream.kafka.streams.binder:
  configuration:
    commit.interval.ms: 1000
    default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
    default.value.serde: org.apache.kafka.common.serialization.Serdes$IntegerSerde
    application.id: dlq-demo-sample
  serdeError: sendToDlq
spring.cloud.stream.bindings.process-out-0:
  destination: counts
spring.cloud.stream.bindings.process-in-0:
  destination: words
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer:
  dlqName: words-count-dlq
  valueSerde: org.apache.kafka.common.serialization.Serdes$IntegerSerde

The example uses <spring-cloud.version>2020.0.2</spring-cloud.version> and

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.4.4</version>
        <relativePath/>
    </parent>

I upgraded this libraries to newer versions: <spring-cloud.version>2023.0.0</spring-cloud.version> and

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.2.2</version>
        <relativePath/>
    </parent>

After updating versions the usual scenario works as expected (in case the value serde is StringSerde), but DLQ configuration ignored (using original yml).

Do you think that I updated example to incompatible libraries?

...following spring documentation its should be compatible:

|Release Train       | Spring Boot Generation |
|---------------------------------------------|
|2023.0.x aka Leyton | 3.2.x                  |
...

Do you think I missed some important properties to make it working with newer spring versions?


Solution

  • serde-error in your configuration (serdeError: sendToDlq) has been deprecated for a while and removed from the binder in the 4.0.x version. This is in favor of deserialization-exception-handler. See the ref docs for details: https://docs.spring.io/spring-cloud-stream/reference/kafka/kafka-streams-binder/error-handling.html

    Can you use that new property and try again? We will update the sample you referenced above to reflect the new configuration.