We have a spring cloud stream app using Kafka. The requirement is that on the producer side the list of messages needs to be put in a topic in a transaction. There is no consumer for the messages in the same app.
When i initiated the transaction using spring.cloud.stream.kafka.binder.transaction.transaction-id
prefix, I am facing the error that there is no subscriber for the dispatcher and a total number of partitions obtained from the topic is less than the transaction configured. The app is not able to obtain the partitions for the topic in transaction mode.
Could you please tell me if I am missing anything?
You need to show your code and configuration as well as the versions you are using.
Producer-only transactions are discussed in the documentation.
Enable transactions by setting
spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix
to a non-empty value, e.g.tx-
. When used in a processor application, the consumer starts the transaction; any records sent on the consumer thread participate in the same transaction. When the listener exits normally, the listener container will send the offset to the transaction and commit it. A common producer factory is used for all producer bindings configured usingspring.cloud.stream.kafka.binder.transaction.producer.*
properties; individual binding Kafka producer properties are ignored.If you wish to use transactions in a source application, or from some arbitrary thread for producer-only transaction (e.g.
@Scheduled
method), you must get a reference to the transactional producer factory and define aKafkaTransactionManager
bean using it.@Bean public PlatformTransactionManager transactionManager(BinderFactory binders) { ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null, MessageChannel.class)).getTransactionalProducerFactory(); return new KafkaTransactionManager<>(pf); }
Notice that we get a reference to the binder using the
BinderFactory
; usenull
in the first argument when there is only one binder configured. If more than one binder is configured, use the binder name to get the reference. Once we have a reference to the binder, we can obtain a reference to theProducerFactory
and create a transaction manager.Then you would just normal Spring transaction support, e.g.
TransactionTemplate
or@Transactional
, for example:public static class Sender { @Transactional public void doInTransaction(MessageChannel output, List<String> stuffToSend) { stuffToSend.forEach(stuff -> output.send(new GenericMessage<>(stuff))); } }
If you wish to synchronize producer-only transactions with those from some other transaction manager, use a
ChainedTransactionManager
.