apache-kafkaspring-kafkaspring-transactionskafka-transactions-api

Why my Spring Kafka unit test almost ran into ProducerFencedException every time


The test is consist of 2 test cases

@Test
@Nested
inner class Test1 {
   fun test1() {
     mypublisher.publishInTransaction(topic1)  // see log1
     // check listener1  // see log2
   }
}

@Test
@Nested
inner class Test2 {
   fun test2() {
     mypublisher.publishInTransaction(topic2)  // see log3
     // check listener2
   }
}

fun MyPublisher.publishInTransaction() {
  // it is kafkatemplate instance
  producerOnlyKafkaTemplate.executeInransaction {
    producerOnlyKafkaTemplate.send(xxxx)
  }
}

test1 run before test2, log is in time order

log1

[Test worker] o.s.k.core.DefaultKafkaProducerFactory   : Created new Producer: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@58bb6ba7] []
[Test worker] o.s.k.core.DefaultKafkaProducerFactory   : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@58bb6ba7] beginTransaction() []
[Test worker] o.s.k.core.DefaultKafkaProducerFactory   : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@58bb6ba7] send(ProducerRecord())
[Test worker] o.s.k.core.DefaultKafkaProducerFactory   : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@58bb6ba7] commitTransaction() []
 2023-05-30 12:14:26.913  [Test worker] o.s.k.core.DefaultKafkaProducerFactory   : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@58bb6ba7] close(PT5S) []

log2

[Processor-0-C-1] o.s.k.core.DefaultKafkaProducerFactory   : Created new Producer: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@671afc6b] []
xxx this happened inside listener and the verification passed

log3

it reuses the producer created in log1, the gap is around 32seconds

 2023-05-30 12:14:58.206  [Test worker] o.s.k.core.DefaultKafkaProducerFactory   : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@58bb6ba7] beginTransaction() []
[Test worker] o.s.k.core.DefaultKafkaProducerFactory   : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@58bb6ba7] send(ProducerRecord(xx))

[Test worker] o.s.k.core.DefaultKafkaProducerFactory   : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@58bb6ba7] commitTransaction() []


[kafka-network-thread-test-txn-TX-0] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=producer-ruis-MacBook-Pro-2.local-test-txn-TX-0, transactionalId=ruis-MacBook-Pro-2.local-test-txn-TX-0] Enqueuing transactional request AddPartitionsToTxnRequestData(transactionalId='ruis-MacBook-Pro-2.local-test-txn-TX-0', producerId=0, producerEpoch=0, topics=[AddPartitionsToTxnTopic(name="*", partitions=[0])]) []

Sending ADD_PARTITIONS_TO_TXN request with header RequestHeader

Received ADD_PARTITIONS_TO_TXN response from node 0 for request with xxx results=[AddPartitionsToTxnPartitionResult(partitionIndex=0, errorCode=90)])]) []

o.a.k.c.p.internals.TransactionManager   : [Producer clientId=producer-ruis-MacBook-Pro-2.local-test-txn-TX-0, transactionalId=ruis-MacBook-Pro-2.local-test-txn-TX-0] Transiting to fatal error state due to org.apache.kafka.common.errors.ProducerFencedException: There is a newer producer with the same transactionalId which fences the current one. []


[Test worker] o.s.k.core.DefaultKafkaProducerFactory   : commitTransaction failed: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@58bb6ba7] []
[Test worker] o.s.k.core.DefaultKafkaProducerFactory   : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@58bb6ba7] close(PT5S) []
[Test worker] o.s.k.core.DefaultKafkaProducerFactory   : Error during some operation; producer removed from cache: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@58bb6ba7] []

it looks to me the producer@58bb6ba7 in log1, has comitted and got a newer epoch, however it is reused by test2(log3) with a lower epoch, what could be the problem?

the ENV is

spring-kafka-test 3.0.6

Updated (2023/May/31)

In this case I see the MyProducerOnlyFactory bean is called twice(it should be singleton), hence I have myproducer-txnid linked to two MyProducerOnlyFactory instances.

with MyProducerOnlyFactory-1 I get myproducer-txnid-0, commit transaction and return it to cache.

later on, in another listener thread, a kafkatemplate with MyProducerOnlyFactory-2 find the above producer instance from cache, commit transaction and update producerepoch to 1

after that MyProducerOnlyFactory-1 get back the myproducer-txnid-0, it thinks it is still epoch=0, commit transaction but get ProducerFencedException of course.

I fixed the issue with

@NestedTestConfiguration(NestedTestConfiguration.EnclosingConfiguration.OVERRIDE)
@SpringBootTest(xxx)
class MyNestedIntegrationTest {
  //xxx
}

with that I can see the MyProducerOnlyFactory bean initialised once only.


Solution

  • In this case I see the MyProducerOnlyFactory bean is called twice(it should be singleton), hence I have myproducer-txnid linked to two MyProducerOnlyFactory instances.

    with MyProducerOnlyFactory-1 I get myproducer-txnid-0, commit transaction and return it to cache.

    later on, in another listener thread, a kafkatemplate with MyProducerOnlyFactory-2 find the above producer instance from cache, commit transaction and update producerepoch to 1

    after that MyProducerOnlyFactory-1 get back the myproducer-txnid-0, it thinks it is still epoch=0, commit transaction but get ProducerFencedException of course.

    I fixed the issue with NestedTestConfiguration

    @NestedTestConfiguration(NestedTestConfiguration.EnclosingConfiguration.OVERRIDE)
    @SpringBootTest(xxx)
    class MyNestedIntegrationTest {
      //xxx
    }
    

    with that I can see the MyProducerOnlyFactory bean initialised once only.