We are using transactional producers and sometimes we find ourselves in the situation where there is no traffic for more than 7 days, which leads to the loss of transactional id metadata. The next write after this time always causes an error:
org.apache.kafka.common.errors.InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id.
workaround for now is to restart all instances in Kubernetes.
Reviewing the documentation, I see that in version 2.5.8 a new property 'maxAge' has been added, which must be less than 7 days to be able to refresh the metadata on the broker side. I was testing and there is no way to make it work as expected.
I have set up a demo project to simulate the behavior, except for some error on my part, maxAge does not refresh metadata before the time of:
transactional.id.expiration.ms
Versions used:
The transaction metada is forced to expire in 10 seconds.
KAFKA_TRANSACTIONAL_ID_EXPIRATION_MS: 10000
KAFKA_PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS: 100
KAFKA_PRODUCER_ID_EXPIRATION_MS: 10000
KAFKA_TRANSACTION_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS: 10000
KAFKA_TRANSACTION_REMOVE_EXPIRED_TRANSACTION_CLEANUP_INTERVAL_MS: 10000
Set the maxAge to 7 seconds, with the expectation that it will refresh before 10 seconds to gain an additional 10 seconds:
@Configuration
@Slf4j
public class KafkaConfig {
@Bean
public DefaultKafkaProducerFactoryCustomizer producerFactoryCustomizer() {
return producerFactory -> producerFactory.setMaxAge(Duration.ofSeconds(7));
}
}
Test Controller to producer message on demand
@RestController
public class TestController {
private AtomicInteger semaphore = new AtomicInteger(0);
@Autowired
private StreamBridge streamBridge;
@GetMapping(value = "/test")
public void sequentialSimple() {
streamBridge.send("sendTestData-out-0", "testMessage_" + semaphore.getAndIncrement());
}
}
I expect to never encounter the InvalidPidMappingException, but I always see it.
2023-11-19T13:22:57.234+01:00 DEBUG 28648 --- [fix_localhost-0] o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-transactionTestPrefix_localhost-0, transactionalId=transactionTestPrefix_localhost-0] Transition from state COMMITTING_TRANSACTION to error state ABORTABLE_ERROR
org.apache.kafka.common.errors.InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id.
If I don't handle the ABORTABLE_ERROR (AfterRollbackProcessor) or DLQ, I also lose messages. What am I doing wrong?
Github Demo Project: https://github.com/Fonexn/kafkaMaxAgeTesting
Thanks
SOLUTION
Set maxAge
to each binder in this way, example:
@Bean
KafkaTransactionManager customKafkaTransactionManager() {
KafkaMessageChannelBinder kafka = (KafkaMessageChannelBinder)this.binderFactory.getBinder("kafka1", MessageChannel.class);
DefaultKafkaProducerFactory<byte[], byte[]> producerFactory = (DefaultKafkaProducerFactory<byte[], byte[]>) kafka.getTransactionalProducerFactory();
producerFactory.setMaxAge(Duration.ofSeconds(60));
return new KafkaTransactionManager(producerFactory);
}
maxAge
does not refresh the metadata. When a transactional producer is retrieved from the cache, if the maxAge
has been exceeded, the producer is closed and we examine the next one in the cache. This continues until a young producer is found or the cache is empty. If all the producers in the cache are too old, a new producer is created.
See createTransactionalProducer()
and expire()
.