I'm using Spring Cloud Stream(version 4.0.3) and Kafka Binder in a Spring Boot application to consume messages in batches from a Kafka Topic. When an exception is thrown the entire batch is being sent to DLQ topic without retrying. Please help me find the issue.
Below is my retry and dlq configuration
@Configuration
public class KafkaRetryConfig {
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer(KafkaOperations<Object, Object> bytesTemplate) {
return (container, destinationName, group) -> {
container.setCommonErrorHandler(new DefaultErrorHandler(new DeadLetterPublishingRecoverer(bytesTemplate), new FixedBackOff(5000L, 5L)));
};
}
}
Below is the Kafka consumer code
@Bean
public Consumer<Message<List<records>>> recordsConsumer() {
return message -> {
List<records> records= message.getPayload();
int index = IntStream.range(0, records.size())
.filter(streamIndex -> records.get(streamIndex).getId().equals("abc123"))
.findFirst()
.orElse(-1);
Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
assert acknowledgment != null;
try {
if (index > -1) {
throw new RuntimeException("runtime exception");
}
//message processing logic
acknowledgment.acknowledge();
} catch (Exception e) {
throw new BatchListenerFailedException(records.get(index).toString(),index);
}
};
}
Below are my application properties
spring:
cloud:
stream:
default-binder: kafka
default:
contentType: application/*+avro
consumer:
useNativeDecoding: true
autoStartup: false
producer:
useNativeEncoding: true
kafka:
binder:
autoCreateTopics: false
brokers: broker
configuration:
enable:
auto.commit: false
idempotence: true
max.in.flight.requests.per.connection: 1
request.timeout.ms: 5000
security.protocol: SASL_SSL
sasl:
kerberos:
service:
name: service-name
jaas:
config: com.sun.security.auth.module.Krb5LoginModule required
doNotPrompt=true
useKeyTab=true
useTicketCache=false
storeKey=true
keyTab="xyz.keytab"
principal="principal@PAYCHEX.COM";
ssl:
endpoint.identification.algorithm:
truststore:
type: JKS
location: /config/global/payx-cacerts/cacerts
password: changeit
consumer-properties:
client.id: hrs-productsubscription-consumer-test-9
key.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
specific.avro.reader: true
schema.registry.url: schema-registry-url
#fetch.max.wait.ms: 60000
max.poll.records: 200
requiredAcks: -1
producer-properties:
key.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
bindings:
recordsConsumer-in-0:
consumer:
#ackMode: MANUAL
startOffset: earliest
resetOffsets: false
autoCommitOffset: false
enableDlq: true
dlqName: dlq-topic-name
dlqPartitions: 1
dlqProducerProperties:
configuration:
key.serializer: org.apache.kafka.common.serialization.StringSerializer
value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
schema.registry.url: schema-registry-url
configuration:
group.id: group-id
schema.registry.url: schema-registry-url
autoStartup: true
key.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
value.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring:
deserializer:
key:
delegate.class: org.apache.kafka.common.serialization.StringDeserializer
value:
delegate.class: io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer
bindings:
recordsConsumer-in-0:
consumer:
batch-mode: true
max-attempts: 2
destination: topic-name
group: group-name
partitioned: true
concurrency: 8
The retry configuration in my app doesn't seem to be working.
Please review the answer on this SO thread and see if this is related. Spring Cloud Stream Kafka Binder - Retries not working when using DLQ in Batch Mode
It looks like this is due to similar issues. Please update to the latest version of the binder (4.1.2
) as there were some fixes in this area on the 4.1.x
line.