I am using Spring Cloud Version 2023.0.1 (spring cloud stream version 4.1.1) and I have written a simple kafka consumer in batch mode to simulate an error scenario.
@Bean
Consumer<Message<List<String>>> consumer1() {
return message -> {
final List<String> payload = message.getPayload();
final MessageHeaders messageHeaders = message.getHeaders();
payload.forEach(System.out::println);
payload.forEach(p -> {
if(p.startsWith("a")) {
throw new RuntimeException("Intentional Exception");
}
});
System.out.println(messageHeaders);
System.out.println("Done");
};
}
My application.yml
file looks like this
spring:
cloud:
function:
definition: consumer1;
stream:
bindings:
consumer1-in-0:
destination: topic1
group: consumer1-in-0-v0.1
consumer:
batch-mode: true
use-native-decoding: true
max-attempts: 3
kafka:
binder:
brokers:
- localhost:9092
default:
consumer:
configuration:
max.poll.records: 1000
max.partition.fetch.bytes: 31457280
fetch.max.wait.ms: 200
bindings:
consumer1-in-0:
consumer:
enableDlq: true
dlqName: dlq-topic
dlqProducerProperties:
configuration:
value.serializer: org.apache.kafka.common.serialization.StringSerializer
key.serializer: org.apache.kafka.common.serialization.StringSerializer
configuration:
key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
value.deserializer: org.apache.kafka.common.serialization.StringDeserializer
I have also specified a ListenerContainerWithDlqAndRetryCustomizer
to customize the retries
@Bean
ListenerContainerWithDlqAndRetryCustomizer cust(KafkaTemplate<?, ?> template) {
return new ListenerContainerWithDlqAndRetryCustomizer() {
@Override
public void configure(AbstractMessageListenerContainer<?, ?> container, String destinationName,
String group,
@Nullable BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> dlqDestinationResolver,
@Nullable BackOff backOff) {
ConsumerRecordRecoverer dlpr = new DeadLetterPublishingRecoverer(template,
dlqDestinationResolver);
container.setCommonErrorHandler(new DefaultErrorHandler(dlpr, backOff));
}
@Override
public boolean retryAndDlqInBinding(String destinationName, String group) {
return false;
}
};
}
The issue When an error happens, the message batch goes straight to the DLQ. And no retries are attempted.
However the problem is that there can be transient errors because of which the batch failed processing, and I want the batch to be retried a few times before sending it to DLQ. But I am not able to get it to work.
What am I doing wrong?
In case anyone stumbles upon this in future, i figured out what was wrong.
I had to remove enableDlq
, dlqName
and dlqProducerProperties
from application.yml
file.
Then it worked.
In the java code, I also removed ListenerContainerWithDlqAndRetryCustomizer
and just used ListenerContainerCustomizer
.
The code looked something like this:
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer(DefaultErrorHandler errorHandler) {
return (container, dest, group) -> container.setCommonErrorHandler(errorHandler);
}
@Bean
public DefaultErrorHandler errorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
return new DefaultErrorHandler(deadLetterPublishingRecoverer, new FixedBackOff(0, 4));
}
@Bean
public DeadLetterPublishingRecoverer publisher(KafkaOperations<?, ?> stringTemplate,
KafkaOperations<?, ?> bytesTemplate,
KafkaOperations<?, ?> longTemplate) {
Map<Class<?>, KafkaOperations<?, ?>> templates = new LinkedHashMap<>();
templates.put(String.class, stringTemplate);
templates.put(byte[].class, bytesTemplate);
templates.put(Long.class, longTemplate);
return new DeadLetterPublishingRecoverer(templates);
}
@Bean
public KafkaTemplate<String, String> stringTemplate(ProducerFactory<String, String> pf) {
return new KafkaTemplate<>(pf,
Map.of(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class, ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class));
}
@Bean
public KafkaTemplate<String, String> bytesTemplate(ProducerFactory<String, String> pf) {
return new KafkaTemplate<>(pf);
}
@Bean
public KafkaTemplate<String, Long> longTemplate(ProducerFactory<String, Long> pf) {
return new KafkaTemplate<>(pf,
Map.of(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, LongSerializer.class, ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class));
}