I have an application using Spring Cloud Stream Kafka. For user defined topics I can delete records from specified topics by giving the configuration I mentioned below. But this configuration doesn't work for DLQ Topics.
For example in the configuration below, I configured retention time in binder level. So my producer topic (student-topic) defined under bindings level is correctly configured, I can check that the records are deleted when topic logs exceed specified retention byte(300000000).
But binder level retention time doesn't work DLQ topic(person-topic-error-dlq). Is there any different configuration for cleaning records from DLQ topics other than retention time.
How can I do this?
spring:
cloud:
stream:
kafka:
bindings:
person-topic-in:
consumer:
enableDlq: true
dlqName: person-topic-error-dlq
binders:
defaultKafka:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
default:
producer:
topic:
properties:
retention.bytes: 300000000
segment.bytes: 300000000
binder:
brokers: localhost:19092
bindings:
person-topic-in:
binder: defaultKafka
destination: person-topic
contentType: application/json
group: person-topic-group
student-topic-out:
binder: defaultKafka
destination: student-topic
contentType: application/json
You are only setting the (default) properties for producer bindings.
That said, this still doesn't work for me:
binders:
defaultKafka:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
default:
producer:
topic:
properties:
retention.bytes: 300000000
segment.bytes: 300000000
consumer:
topic:
properties:
retention.bytes: 300000000
segment.bytes: 300000000
(the properties are not applied to even the primary topic).
Looks like there is a problem with default kafka consumer binding properties.
This works for me; the properties are applied to both the primary and dead letter topics:
spring:
cloud:
stream:
kafka:
bindings:
person-topic-in:
consumer:
enableDlq: true
dlqName: person-topic-error-dlq
topic:
properties:
retention.bytes: 300000000
segment.bytes: 300000000