I'm following this recipe to handle deserialisation errors: https://github.com/spring-cloud/spring-cloud-stream-samples/blob/main/recipes/recipe-3-handling-deserialization-errors-dlq-kafka.adoc
I created the beans mentioned in the recipe above as:
Configuration
@Slf4j
public class ErrorHandlingConfig {
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer(SeekToCurrentErrorHandler errorHandler) {
return (container, dest, group) -> {
container.setErrorHandler(errorHandler);
};
}
@Bean
public SeekToCurrentErrorHandler errorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
return new SeekToCurrentErrorHandler(deadLetterPublishingRecoverer);
}
@Bean
public DeadLetterPublishingRecoverer publisher(KafkaOperations bytesTemplate) {
return new DeadLetterPublishingRecoverer(bytesTemplate);
}
}
configuration file:
spring:
cloud:
stream:
default:
producer:
useNativeEncoding: true
consumer:
useNativeDecoding: true
bindings:
myInboundRoute:
destination: some-destination.1
group: a-custom-group
myOutboundRoute:
destination: some-destination.2
kafka:
binder:
brokers: localhost
defaultBrokerPort: 9092
configuration:
application:
security: PLAINTEXT
bindings:
myInboundRoute:
consumer:
autoCommitOffset: true
startOffset: latest
enableDlq: true
dlqName: my-dql.poison
dlqProducerProperties:
configuration:
value.serializer: myapp.serde.MyCustomSerializer
configuration:
value.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.deserializer.value.delegate.class: myapp.serde.MyCustomSerializer
myOutboundRoute:
producer:
configuration:
key.serializer: org.apache.kafka.common.serialization.StringSerializer
value.serializer: myapp.serde.MyCustomSerializer
I was expecting the DLT to be called my-dql.poison
. This topic is in fact created fine, however I also get a second topic auto created called some-destination.1.DLT
Why does it create this as well as the one I have named in the config with dlqName
?
What am I doing wrong? When I poll for messages, the message is in the auto created some-destination.1.DLT
and not my dlqName
You should not configure dlt processing in the binding if you configure the STCEH in the container. Also set maxAttempts=1
to disable retries there.
You need to configure a destination resolver in the DLPR to use a different name.
/**
* Create an instance with the provided template and destination resolving function,
* that receives the failed consumer record and the exception and returns a
* {@link TopicPartition}. If the partition in the {@link TopicPartition} is less than
* 0, no partition is set when publishing to the topic.
* @param template the {@link KafkaOperations} to use for publishing.
* @param destinationResolver the resolving function.
*/
public DeadLetterPublishingRecoverer(KafkaOperations<? extends Object, ? extends Object> template,
BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> destinationResolver) {
this(Collections.singletonMap(Object.class, template), destinationResolver);
}
See https://docs.spring.io/spring-kafka/docs/current/reference/html/#dead-letters
There is an open issue to configure the DLPR with the binding's DLT name.
https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1031