spring-bootapache-kafkaspring-kafkakafka-transactions-api

spring kafka producer initiated transaction not working as described in spring docs?


I am using spring boot 3.3 and spring kafka 3.2 and wanted to test the transaction synchronization part as described in the spring docs: https://docs.spring.io/spring-kafka/reference/kafka/transactions.html#transaction-synchronization

Producer config:

@Configuration
public class KafkaProducerConfig {

    @Value(value = "${spring.kafka.bootstrap-servers}")
    private String bootstrapAddress;

    @Bean
    public ProducerFactory<Long, PersonDto> personProducerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(
                ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                bootstrapAddress);
        configProps.put(
                ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                LongSerializer.class);
        configProps.put(
                ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                JsonSerializer.class);
        configProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tx-person");
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<Long, PersonDto> personKafkaTemplate() {
        return new KafkaTemplate<>(personProducerFactory());
    }

    @Bean
    public KafkaTransactionManager personKafkaTransactionManager() {
        KafkaTransactionManager manager = new KafkaTransactionManager(personProducerFactory());
        return manager;
    }

 
    @Bean
    @Primary
    public JpaTransactionManager transactionManager(EntityManagerFactory entityManagerFactory) {
        return new JpaTransactionManager(entityManagerFactory);
    }
}

As you can see my producer is transactional.

My service:

@Service
public class PersonKafkaService {

    @Autowired
    private PersonJpaService personJpaService;

    @Autowired
    private KafkaTemplate<Long, PersonDto> personKafkaTemplate;

    @Transactional
    public void processPersonDetails(PersonDetailsDto personDetails) {
        PersonDto person = personJpaService.save(personDetails);
        personKafkaTemplate.send(OUT_TOPIC, person.id(), person);
    }
}

My logging config from application.yml:

logging:
  level.root: error
  level:
    org.apache.kafka: ERROR
    org.springframework.orm.jpa.JpaTransactionManager: trace
    org.springframework.kafka.transaction: trace

The log output is the following:

2024-06-14T14:03:14.595+03:00 DEBUG 30914 --- [nio-8080-exec-2] o.s.orm.jpa.JpaTransactionManager        : Found thread-bound EntityManager [SessionImpl(571954782<open>)] for JPA transaction
2024-06-14T14:03:14.595+03:00 DEBUG 30914 --- [nio-8080-exec-2] o.s.orm.jpa.JpaTransactionManager        : Creating new transaction with name [ing.hub.ingHub.service.PersonKafkaService.processPersonDetails]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
2024-06-14T14:03:14.597+03:00 DEBUG 30914 --- [nio-8080-exec-2] o.s.orm.jpa.JpaTransactionManager        : Exposing JPA transaction as JDBC [org.springframework.orm.jpa.vendor.HibernateJpaDialect$HibernateConnectionHandle@31ec2652]
2024-06-14T14:03:14.597+03:00 DEBUG 30914 --- [nio-8080-exec-2] o.s.orm.jpa.JpaTransactionManager        : Found thread-bound EntityManager [SessionImpl(571954782<open>)] for JPA transaction
2024-06-14T14:03:14.597+03:00 DEBUG 30914 --- [nio-8080-exec-2] o.s.orm.jpa.JpaTransactionManager        : Participating in existing transaction
2024-06-14T14:03:14.599+03:00 DEBUG 30914 --- [nio-8080-exec-2] o.s.orm.jpa.JpaTransactionManager        : Found thread-bound EntityManager [SessionImpl(571954782<open>)] for JPA transaction
2024-06-14T14:03:14.599+03:00 DEBUG 30914 --- [nio-8080-exec-2] o.s.orm.jpa.JpaTransactionManager        : Participating in existing transaction
2024-06-14T14:03:14.799+03:00 DEBUG 30914 --- [nio-8080-exec-2] o.s.orm.jpa.JpaTransactionManager        : Initiating transaction commit
2024-06-14T14:03:14.800+03:00 DEBUG 30914 --- [nio-8080-exec-2] o.s.orm.jpa.JpaTransactionManager        : Committing JPA transaction on EntityManager [SessionImpl(571954782<open>)]
2024-06-14T14:03:14.816+03:00 DEBUG 30914 --- [nio-8080-exec-2] o.s.orm.jpa.JpaTransactionManager        : Not closing pre-bound JPA EntityManager after transaction

I would have expected to see some logs also about the KafkaTransactionManager ( like I do see for the transactional listener container ) which describes starting the transaction, committing, etc but there is no such thing telling me a kafka transaction is being involved at all.

Am I missing something or is this not working as expected?


Solution

  • I think this is working as expected. When synchronizing with an external transaction manager like the JpaTransactionManager, this becomes the primary transaction manager by default, and the KafkaTransactionManager synchronizes with the JpaTransactionManager. When your @Transactional method exits, the PlatformTransactionManager in Spring will ask the primary txn manager which is JpaTransactionManager to commit the transaction. After the commit is done, it calls the triggerAfterCommit(..) method which checks for any transactions that are synchronized with the primary transaction via the TransactionSynchronization. Spring for Apache Kafka provides an extension of Spring's ResourceHolderSynchronization (which is an implementation of TransactionSynchronization) that overrides the processResourceAfterCommit method, which calls the DefaultKafkaProducerFactory#commit() method, that delegates to the commit call in the Kafka producer.

    If you enable TRACE level logging on the package org.springframework.kafka.core, you will see something like the following in the logs:

    o.s.k.core.DefaultKafkaProducerFactory   : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@4002348f] commitTransaction()
    

    In addition, if you turn TRACE on the kafka transaction packages, you will see a lot of commit activity from the Kafka transaction stack.

    org.apache.kafka.clients.producer: TRACE
    org.apache.kafka.clients.producer.internals: TRACE