The test is consist of 2 test cases
@Test
@Nested
inner class Test1 {
fun test1() {
mypublisher.publishInTransaction(topic1) // see log1
// check listener1 // see log2
}
}
@Test
@Nested
inner class Test2 {
fun test2() {
mypublisher.publishInTransaction(topic2) // see log3
// check listener2
}
}
fun MyPublisher.publishInTransaction() {
// it is kafkatemplate instance
producerOnlyKafkaTemplate.executeInransaction {
producerOnlyKafkaTemplate.send(xxxx)
}
}
test1 run before test2, log is in time order
log1
[Test worker] o.s.k.core.DefaultKafkaProducerFactory : Created new Producer: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@58bb6ba7] []
[Test worker] o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@58bb6ba7] beginTransaction() []
[Test worker] o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@58bb6ba7] send(ProducerRecord())
[Test worker] o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@58bb6ba7] commitTransaction() []
2023-05-30 12:14:26.913 [Test worker] o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@58bb6ba7] close(PT5S) []
log2
[Processor-0-C-1] o.s.k.core.DefaultKafkaProducerFactory : Created new Producer: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@671afc6b] []
xxx this happened inside listener and the verification passed
log3
it reuses the producer created in log1, the gap is around 32seconds
2023-05-30 12:14:58.206 [Test worker] o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@58bb6ba7] beginTransaction() []
[Test worker] o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@58bb6ba7] send(ProducerRecord(xx))
[Test worker] o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@58bb6ba7] commitTransaction() []
[kafka-network-thread-test-txn-TX-0] o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-ruis-MacBook-Pro-2.local-test-txn-TX-0, transactionalId=ruis-MacBook-Pro-2.local-test-txn-TX-0] Enqueuing transactional request AddPartitionsToTxnRequestData(transactionalId='ruis-MacBook-Pro-2.local-test-txn-TX-0', producerId=0, producerEpoch=0, topics=[AddPartitionsToTxnTopic(name="*", partitions=[0])]) []
Sending ADD_PARTITIONS_TO_TXN request with header RequestHeader
Received ADD_PARTITIONS_TO_TXN response from node 0 for request with xxx results=[AddPartitionsToTxnPartitionResult(partitionIndex=0, errorCode=90)])]) []
o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-ruis-MacBook-Pro-2.local-test-txn-TX-0, transactionalId=ruis-MacBook-Pro-2.local-test-txn-TX-0] Transiting to fatal error state due to org.apache.kafka.common.errors.ProducerFencedException: There is a newer producer with the same transactionalId which fences the current one. []
[Test worker] o.s.k.core.DefaultKafkaProducerFactory : commitTransaction failed: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@58bb6ba7] []
[Test worker] o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@58bb6ba7] close(PT5S) []
[Test worker] o.s.k.core.DefaultKafkaProducerFactory : Error during some operation; producer removed from cache: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@58bb6ba7] []
it looks to me the producer@58bb6ba7 in log1, has comitted and got a newer epoch, however it is reused by test2(log3) with a lower epoch, what could be the problem?
the ENV is
spring-kafka-test 3.0.6
Updated (2023/May/31)
In this case I see the MyProducerOnlyFactory
bean is called twice(it should be singleton), hence I have
myproducer-txnid
linked to two MyProducerOnlyFactory
instances.
with MyProducerOnlyFactory-1
I get myproducer-txnid-0
, commit transaction and return it to cache.
later on, in another listener thread, a kafkatemplate with MyProducerOnlyFactory-2
find the above producer instance from cache, commit transaction and update producerepoch to 1
after that MyProducerOnlyFactory-1
get back the myproducer-txnid-0
, it thinks it is still epoch=0, commit transaction but get ProducerFencedException of course.
I fixed the issue with
@NestedTestConfiguration(NestedTestConfiguration.EnclosingConfiguration.OVERRIDE)
@SpringBootTest(xxx)
class MyNestedIntegrationTest {
//xxx
}
with that I can see the MyProducerOnlyFactory
bean initialised once only.
In this case I see the MyProducerOnlyFactory
bean is called twice(it should be singleton), hence I have myproducer-txnid
linked to two MyProducerOnlyFactory
instances.
with MyProducerOnlyFactory-1
I get myproducer-txnid-0
, commit transaction and return it to cache.
later on, in another listener thread, a kafkatemplate with MyProducerOnlyFactory-2
find the above producer instance from cache, commit transaction and update producerepoch to 1
after that MyProducerOnlyFactory-1
get back the myproducer-txnid-0
, it thinks it is still epoch=0, commit transaction but get ProducerFencedException of course.
I fixed the issue with NestedTestConfiguration
@NestedTestConfiguration(NestedTestConfiguration.EnclosingConfiguration.OVERRIDE)
@SpringBootTest(xxx)
class MyNestedIntegrationTest {
//xxx
}
with that I can see the MyProducerOnlyFactory
bean initialised once only.