I have an issue with using the RetryableTopic together with the opentracing springboot functionality. The RetryableTopic definition looks like this:
@RetryableTopic(
attempts = "3",
backoff = @Backoff(delay = 3000, multiplier = 2.0),
topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_DELAY_VALUE,
kafkaTemplate = "dltKafkaTemplate",
listenerContainerFactory = "retryEventListenerFactory",
exclude = {
DeserializationException.class,
SerializationException.class,
MessageConversionException.class,
ConversionException.class,
MethodArgumentResolutionException.class,
NoSuchMethodException.class,
ClassCastException.class
}
)
@KafkaListener(
topics = "dlt-msg-test-topic",
containerFactory = "retryEventListenerFactory")
public void consume(
String message, @Headers MessageHeaders messageHeaders) {
LOGGER.info("Received {}", message);
throw new RuntimeException("Test retry exception");
}
@DltHandler
public void dlt(String in, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
LOGGER.info(in + " from " + topic);
}
This sample works fine by itself, triggers the retries and dlt. As soon as I try to add the tracing functionality to the project by introducing opentracing-spring-cloud-starter for example and related dependencies then the following error is thrown:
java.lang.UnsupportedOperationException: This implementation doesn't support this method
at org.springframework.kafka.core.ProducerFactory.getConfigurationProperties(ProducerFactory.java:120)
at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.determineSendTimeout(DeadLetterPublishingRecoverer.java:661)
at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.verifySendResult(DeadLetterPublishingRecoverer.java:636)
at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.publish(DeadLetterPublishingRecoverer.java:628)
at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.send(DeadLetterPublishingRecoverer.java:524)
at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.sendOrThrow(DeadLetterPublishingRecoverer.java:489)
at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.accept(DeadLetterPublishingRecoverer.java:461)
at org.springframework.kafka.listener.FailedRecordProcessor.getRecoveryStrategy(FailedRecordProcessor.java:181)
at org.springframework.kafka.listener.DefaultErrorHandler.handleRemaining(DefaultErrorHandler.java:134)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:2674)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2555)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2429)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2307)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1981)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1365)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1356)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1251)
After debugging, I have found that the opentracing creates a wrapper around the original consumer that is called TracingKafkaConsumer. This TracingKafkaConsumer stores the original consumer as one of it fields. So when the DefaultErrorHandler is called as part of retry cycle instead of original consumer the TracingKafkaConsumer is passed into the handleRemaining method of DefaultErrorHandler.
If i understand correctly, after that the SeekUtils.seekOrRecover fails with exception because it cannot detect the necessary parameters from the original consumer like it would when running without the tracing functionality. My guess is that those parameters cannot be found because the original consumer is now a field within the TracingKafkaConsumer and hence that seek functionality that fails doesn't know how to get the things it needs from the original consumer.
My question is, how can I get out of this situation and have both TracingKafkaConsumer and RetryableTopic functionality working correctly together?
It looks like open tracing wraps the ProducerFactory
in another factory (that, presumably, wraps the producers in the tracing producers).
java.lang.UnsupportedOperationException: This implementation doesn't support this method
at org.springframework.kafka.core.ProducerFactory.getConfigurationProperties(ProducerFactory.java:120)
The producer factory wrapper should implement getConfigurationProperties()
and call the delegate factory to get its properties.
I suggest you open a bug against open tracing to fix their wrapper so that it implements all the necessary methods by delegating to the real factory.
That said, all we are trying to do here is determine the send timeout setting on the producer.
Please open an issue here: https://github.com/spring-projects/spring-kafka/issues we can be a bit more tolerant if the producer factory doesn't honor the contract, and fall back to some default timeout.
EDIT
It looks like they already fixed it...
They fixed it a couple of years ago; you must be using an old version
https://github.com/opentracing-contrib/java-kafka-client/pull/87
EDIT2
Well, it was fixed in 2021, but it looks like there hasn't been a release since 2020.
Maybe that project is dead?