I am using spring boot 3.3 and spring kafka 3.2 and wanted to test the transaction synchronization part as described in the spring docs: https://docs.spring.io/spring-kafka/reference/kafka/transactions.html#transaction-synchronization
Producer config:
@Configuration
public class KafkaProducerConfig {
@Value(value = "${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
@Bean
public ProducerFactory<Long, PersonDto> personProducerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapAddress);
configProps.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
LongSerializer.class);
configProps.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
JsonSerializer.class);
configProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tx-person");
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<Long, PersonDto> personKafkaTemplate() {
return new KafkaTemplate<>(personProducerFactory());
}
@Bean
public KafkaTransactionManager personKafkaTransactionManager() {
KafkaTransactionManager manager = new KafkaTransactionManager(personProducerFactory());
return manager;
}
@Bean
@Primary
public JpaTransactionManager transactionManager(EntityManagerFactory entityManagerFactory) {
return new JpaTransactionManager(entityManagerFactory);
}
}
As you can see my producer is transactional.
My service:
@Service
public class PersonKafkaService {
@Autowired
private PersonJpaService personJpaService;
@Autowired
private KafkaTemplate<Long, PersonDto> personKafkaTemplate;
@Transactional
public void processPersonDetails(PersonDetailsDto personDetails) {
PersonDto person = personJpaService.save(personDetails);
personKafkaTemplate.send(OUT_TOPIC, person.id(), person);
}
}
My logging config from application.yml:
logging:
level.root: error
level:
org.apache.kafka: ERROR
org.springframework.orm.jpa.JpaTransactionManager: trace
org.springframework.kafka.transaction: trace
The log output is the following:
2024-06-14T14:03:14.595+03:00 DEBUG 30914 --- [nio-8080-exec-2] o.s.orm.jpa.JpaTransactionManager : Found thread-bound EntityManager [SessionImpl(571954782<open>)] for JPA transaction
2024-06-14T14:03:14.595+03:00 DEBUG 30914 --- [nio-8080-exec-2] o.s.orm.jpa.JpaTransactionManager : Creating new transaction with name [ing.hub.ingHub.service.PersonKafkaService.processPersonDetails]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
2024-06-14T14:03:14.597+03:00 DEBUG 30914 --- [nio-8080-exec-2] o.s.orm.jpa.JpaTransactionManager : Exposing JPA transaction as JDBC [org.springframework.orm.jpa.vendor.HibernateJpaDialect$HibernateConnectionHandle@31ec2652]
2024-06-14T14:03:14.597+03:00 DEBUG 30914 --- [nio-8080-exec-2] o.s.orm.jpa.JpaTransactionManager : Found thread-bound EntityManager [SessionImpl(571954782<open>)] for JPA transaction
2024-06-14T14:03:14.597+03:00 DEBUG 30914 --- [nio-8080-exec-2] o.s.orm.jpa.JpaTransactionManager : Participating in existing transaction
2024-06-14T14:03:14.599+03:00 DEBUG 30914 --- [nio-8080-exec-2] o.s.orm.jpa.JpaTransactionManager : Found thread-bound EntityManager [SessionImpl(571954782<open>)] for JPA transaction
2024-06-14T14:03:14.599+03:00 DEBUG 30914 --- [nio-8080-exec-2] o.s.orm.jpa.JpaTransactionManager : Participating in existing transaction
2024-06-14T14:03:14.799+03:00 DEBUG 30914 --- [nio-8080-exec-2] o.s.orm.jpa.JpaTransactionManager : Initiating transaction commit
2024-06-14T14:03:14.800+03:00 DEBUG 30914 --- [nio-8080-exec-2] o.s.orm.jpa.JpaTransactionManager : Committing JPA transaction on EntityManager [SessionImpl(571954782<open>)]
2024-06-14T14:03:14.816+03:00 DEBUG 30914 --- [nio-8080-exec-2] o.s.orm.jpa.JpaTransactionManager : Not closing pre-bound JPA EntityManager after transaction
I would have expected to see some logs also about the KafkaTransactionManager ( like I do see for the transactional listener container ) which describes starting the transaction, committing, etc but there is no such thing telling me a kafka transaction is being involved at all.
Am I missing something or is this not working as expected?
I think this is working as expected. When synchronizing with an external transaction manager like the JpaTransactionManager,
this becomes the primary transaction manager by default, and the KafkaTransactionManager
synchronizes with the JpaTransactionManager.
When your @Transactional
method exits, the PlatformTransactionManager
in Spring will ask the primary txn manager which is JpaTransactionManager
to commit the transaction. After the commit is done, it calls the triggerAfterCommit(..) method which checks for any transactions that are synchronized with the primary transaction via the TransactionSynchronization
. Spring for Apache Kafka provides an extension of Spring's ResourceHolderSynchronization (which is an implementation of TransactionSynchronization
) that overrides the processResourceAfterCommit
method, which calls the DefaultKafkaProducerFactory#commit()
method, that delegates to the commit call in the Kafka producer.
If you enable TRACE
level logging on the package org.springframework.kafka.core
, you will see something like the following in the logs:
o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@4002348f] commitTransaction()
In addition, if you turn TRACE
on the kafka transaction packages, you will see a lot of commit activity from the Kafka transaction stack.
org.apache.kafka.clients.producer: TRACE
org.apache.kafka.clients.producer.internals: TRACE