apache-kafkaspring-cloud-streamdead-letterspring-cloud-stream-binder-kafkaretention

How to apply retention time configuration for Dead Letter Queue in Spring Cloud Stream Kafka Binder?


I have an application using Spring Cloud Stream Kafka. For user defined topics I can delete records from specified topics by giving the configuration I mentioned below. But this configuration doesn't work for DLQ Topics.

For example in the configuration below, I configured retention time in binder level. So my producer topic (student-topic) defined under bindings level is correctly configured, I can check that the records are deleted when topic logs exceed specified retention byte(300000000).

But binder level retention time doesn't work DLQ topic(person-topic-error-dlq). Is there any different configuration for cleaning records from DLQ topics other than retention time.

How can I do this?

spring:
  cloud:
    stream:
      kafka:
        bindings:
          person-topic-in:
            consumer:
              enableDlq: true
              dlqName: person-topic-error-dlq
      binders:
        defaultKafka:
          type: kafka
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    default:
                      producer:
                        topic:
                          properties:
                            retention.bytes: 300000000
                            segment.bytes: 300000000
                    binder:
                      brokers: localhost:19092
      bindings:
        person-topic-in:
          binder: defaultKafka
          destination: person-topic
          contentType: application/json
          group: person-topic-group
        student-topic-out:
          binder: defaultKafka
          destination: student-topic
          contentType: application/json

Solution

  • You are only setting the (default) properties for producer bindings.

    That said, this still doesn't work for me:

          binders:
            defaultKafka:
              type: kafka
              environment:
                spring:
                  cloud:
                    stream:
                      kafka:
                        default:
                          producer:
                            topic:
                              properties:
                                retention.bytes: 300000000
                                segment.bytes: 300000000
                          consumer:
                            topic:
                              properties:
                                retention.bytes: 300000000
                                segment.bytes: 300000000
    

    (the properties are not applied to even the primary topic).

    Looks like there is a problem with default kafka consumer binding properties.

    This works for me; the properties are applied to both the primary and dead letter topics:

    spring:
      cloud:
        stream:
          kafka:
            bindings:
              person-topic-in:
                consumer:
                  enableDlq: true
                  dlqName: person-topic-error-dlq
                  topic:
                    properties:
                      retention.bytes: 300000000
                      segment.bytes: 300000000