springspring-kafka

Kafka ConsumerRecordRecoverer with transaction


I don't understand one moment with a recoverer in the transaction mode. All the classes below are just for example to understand the transactional nature of recoverer.

I have a listener that throws an exception:

@KafkaListener(topics = "messages")
public void handle(@Payload String data) {
    if ("error".equalsIgnoreCase(data)) {
        throw new RuntimeException("error has occurred in MessageHandler");
    }
}

I have the bean of AfterRollbackProcessor:

@Bean
public AfterRollbackProcessor<?, ?> afterRollbackProcessor(ConsumerRecordRecoverer recoverer,
                                                           KafkaTemplate<?, ?> kafkaTemplate) {
    return new DefaultAfterRollbackProcessor<>(recoverer, new FixedBackOff(2000, 0), kafkaTemplate, true);
}

And finally I have a recoverer:

private final KafkaTemplate<Integer, String> kafkaTemplate;

private final ThreadLocal<Integer> i = new ThreadLocal<>();

@Transactional(propagation = Propagation.REQUIRES_NEW)
@Override
public void accept(ConsumerRecord<?, ?> consumerRecord, Exception e) {
    String data = (String) consumerRecord.value();
    Integer j = i.get();
    j = j == null ? 0 : j;
    log.error("ErrorTopicRecoverer send {}", j);
    kafkaTemplate.send("errors", data + "_" + j);
    if (j < 2) {;
        i.set(++j);
        throw new RuntimeException("error in ErrorTopicRecoverer", e);
    }
    i.set(0);
}

My application.yml:

spring:
  kafka:
    bootstrap-servers:
      - http://127.0.0.1:9092
      - http://127.0.0.1:9192
      - http://127.0.0.1:9292
    producer:
      key-serializer: org.apache.kafka.common.serialization.IntegerSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      transaction-id-prefix: tx-
    consumer:
      group-id: test-group
      key-deserializer: org.apache.kafka.common.serialization.IntegerDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      isolation-level: read_committed
      enable-auto-commit: false
    listener:
      concurrency: 3
      ack-mode: record

The listener is transactional. I throw an exception in the recoverer two times in a row after kafkaTemplate.send(). My expectation was kafkaTemplate.send() would rollback two times and on the third time would commit.

Without @Transactional I received 3 messages in the errors topic. kafkaTemplate.send() committed 3 times.

With @Transactional without any parameters I received a various number of messages in the errors topic (could be 3, 2, 1). kafkaTemplate.send() could commit from 1 to 3 times.

With @Transactional(propagation = Propagation.REQUIRES_NEW) I received exactly one message in the errors topic. kafkaTemplate.send() committed one time.

I assumed that the recoverer would be executed in the same transaction as the listener had started in. But without @Transactional in the recoverer kafkaTemplate.send() works anyway and doesn't rollback. How does recoverer work in transaction?

And is this right to set the propagation as REQUIRES_NEW in the recoverer? With this value kafkaTemplate.send() works correctly.


Solution

  • Your understanding is correct. The recoverer operates in its own transaction when using REQUIRES_NEW, which is why you see the behavior you described. If you want to maintain a clear separation between the listener's transaction and the error handling logic, using REQUIRES_NEW is a good approach.