I don't understand one moment with a recoverer in the transaction mode. All the classes below are just for example to understand the transactional nature of recoverer.
I have a listener that throws an exception:
@KafkaListener(topics = "messages")
public void handle(@Payload String data) {
if ("error".equalsIgnoreCase(data)) {
throw new RuntimeException("error has occurred in MessageHandler");
}
}
I have the bean of AfterRollbackProcessor
:
@Bean
public AfterRollbackProcessor<?, ?> afterRollbackProcessor(ConsumerRecordRecoverer recoverer,
KafkaTemplate<?, ?> kafkaTemplate) {
return new DefaultAfterRollbackProcessor<>(recoverer, new FixedBackOff(2000, 0), kafkaTemplate, true);
}
And finally I have a recoverer:
private final KafkaTemplate<Integer, String> kafkaTemplate;
private final ThreadLocal<Integer> i = new ThreadLocal<>();
@Transactional(propagation = Propagation.REQUIRES_NEW)
@Override
public void accept(ConsumerRecord<?, ?> consumerRecord, Exception e) {
String data = (String) consumerRecord.value();
Integer j = i.get();
j = j == null ? 0 : j;
log.error("ErrorTopicRecoverer send {}", j);
kafkaTemplate.send("errors", data + "_" + j);
if (j < 2) {;
i.set(++j);
throw new RuntimeException("error in ErrorTopicRecoverer", e);
}
i.set(0);
}
My application.yml:
spring:
kafka:
bootstrap-servers:
- http://127.0.0.1:9092
- http://127.0.0.1:9192
- http://127.0.0.1:9292
producer:
key-serializer: org.apache.kafka.common.serialization.IntegerSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
transaction-id-prefix: tx-
consumer:
group-id: test-group
key-deserializer: org.apache.kafka.common.serialization.IntegerDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
isolation-level: read_committed
enable-auto-commit: false
listener:
concurrency: 3
ack-mode: record
The listener is transactional. I throw an exception in the recoverer two times in a row after kafkaTemplate.send()
. My expectation was kafkaTemplate.send()
would rollback two times and on the third time would commit.
Without @Transactional
I received 3 messages in the errors topic. kafkaTemplate.send()
committed 3 times.
With @Transactional
without any parameters I received a various number of messages in the errors topic (could be 3, 2, 1). kafkaTemplate.send()
could commit from 1 to 3 times.
With @Transactional(propagation = Propagation.REQUIRES_NEW)
I received exactly one message in the errors topic. kafkaTemplate.send()
committed one time.
I assumed that the recoverer would be executed in the same transaction as the listener had started in. But without @Transactional
in the recoverer kafkaTemplate.send()
works anyway and doesn't rollback. How does recoverer work in transaction?
And is this right to set the propagation as REQUIRES_NEW
in the recoverer? With this value kafkaTemplate.send()
works correctly.
Your understanding is correct. The recoverer operates in its own transaction when using REQUIRES_NEW
, which is why you see the behavior you described. If you want to maintain a clear separation between the listener's transaction and the error handling logic, using REQUIRES_NEW
is a good approach.