I'm using spring-kafka and @RetryTopic and I can't understand how this works
for different groupId-s :
For example, the topic TestTopic contains 5 messages and is listened to by 2 consumers - groupId_1 and groupId_2
groupId_1 did not process message #2 and sent it to TestTopic_1
groupId_2 did not process message #4 and sent it to TestTopic_1
When groupId_1 and groupId_2 read messages from TestTopic_1, it turns out that groupId_1 subtracts both messages #2 and #4 (is it obtained again?) And groupId_1 subtracts message #2 (is it repeated?) and #4 How it works? How to avoid re-processing?
And a question about setting up work @RetryTopic about backoff, for example I have a setting
@RetryableTopic(
attempts = "3",
topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE,
backoff = @Backoff(delay = 1000, maxDelay = 5_000, random = true),
dltTopicSuffix = "dead-two"
)
Do I understand correctly that the thread reading from the partition is blocked for a delay time, after which it sends a message to the topic for re-reading or is a separate thread created for this waiting and sending?
And during this delay the application crashes, then the message will not be sent to the topic TestTopic_N?
And why is maxDelay needed if delay and attempts are specified??
Also see Also see this section in the documentation: https://docs.spring.io/spring-kafka/docs/current/reference/html/#multi-retry You can simply set different retryTopicSuffix
and dltTopicSuffix
on each listener.
The thread is not blocked; the record is immediately written to the next retry topic; the consumer on that topic (when it receives the record) looks at the timestamp and, if it is too early, rejects the message, the partition is paused on that consumer until the time expires, and then it is resumed and the record is redelivered.
See #2.
maxDelay
is only used when using an exponential back off (with a multiplier). e.g. delay = 2, multiplier = 2.0, maxDelay = 10
would result in 2, 4, 8, 10, 10...
up to max attempts.
EDIT
Further to comments below; here is an example of using existing topics for the retry topics:
@SpringBootApplication
@EnableScheduling
public class So77216081Application extends RetryTopicConfigurationSupport {
public static void main(String[] args) {
SpringApplication.run(So77216081Application.class, args);
}
@Override
protected RetryTopicComponentFactory createComponentFactory() {
return new RetryTopicComponentFactory() {
@Override
public RetryTopicNamesProviderFactory retryTopicNamesProviderFactory() {
return new MyNamesProvider();
}
};
}
@Bean
ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
template.send("so77216081", "foo");
};
}
}
@Component
class Listener {
@KafkaListener(id = "so77216081", topics = "so77216081")
@RetryableTopic(attempts = "3", autoCreateTopics = "false")
void listen(String in, @Header(KafkaHeaders.RECEIVED_TOPIC)
String topic) {
System.out.println(in + " from " + topic);
throw new RuntimeException("test");
}
@DltHandler
void dlt(String in, @Header(KafkaHeaders.RECEIVED_TOPIC)
String topic) {
System.out.println(in + " from " + topic);
}
}
class MyNamesProvider implements RetryTopicNamesProviderFactory {
@Override
public RetryTopicNamesProvider createRetryTopicNamesProvider(Properties properties) {
return new SuffixingRetryTopicNamesProvider(properties) {
@Override
public String getTopicName(String topic) {
if (properties.isMainEndpoint() || properties.isDltTopic()) {
return super.getTopicName(topic);
}
else if (super.getTopicName(topic).endsWith("-0")) {
return "TestFirstrandomtopic";
}
else if (super.getTopicName(topic).endsWith("-1")) {
return "TestSecondrandomtopic";
}
else {
throw new IllegalStateException("Shouldn't get here - attempts is only 3");
}
}
};
}
}
foo from so77216081
foo from TestFirstrandomtopic
foo from TestSecondrandomtopic
foo from so77216081-dlt
Note that since autoCreateTopics
is false, all 4 topics must have previously been added to the brokers.