spring-bootjmsactivemq-classicdistributed-transactionsdurable-subscription

Spring boot 3 + ActiveMQ + XA-Transactions + Durable Subscription to JMS Topic dows not work properly


Currently, I can't get XA-Transactions AND Durable Subscription working at the same time. A Demo Projekt is here: https://github.com/proxora/Spring-boot-3-ActiveMQ-XA-Transactions-Durable-Subscription-to-JMS-Topic

I tried atomikos and narayana as XA-Transaction managers, but both failed to work with ActiveMQ when durable subscription is enabled. Then the application tries to connect more than once to activemq with the same client id, and this restricts activemq.

2025-05-20T11:44:05.741+02:00 ERROR 2748380 --- [demo] [     Atomikos:3] c.a.icatch.imp.RecoveryDomainService     : Error in getting XA resource

com.atomikos.datasource.ResourceException: Error in getting XA resource
        at com.atomikos.datasource.xa.jms.JmsTransactionalResource.refreshXAConnection(JmsTransactionalResource.java:80) ~[transactions-jta-6.0.0-jakarta.jar:na]
        at com.atomikos.datasource.xa.XATransactionalResource.refreshXAResource(XATransactionalResource.java:389) ~[transactions-jta-6.0.0-jakarta.jar:na]
        at com.atomikos.datasource.xa.XATransactionalResource.getXAResource(XATransactionalResource.java:226) ~[transactions-jta-6.0.0-jakarta.jar:na]
        at com.atomikos.datasource.xa.XATransactionalResource.recover(XATransactionalResource.java:373) ~[transactions-jta-6.0.0-jakarta.jar:na]
        at com.atomikos.icatch.imp.RecoveryDomainService.performRecovery(RecoveryDomainService.java:82) ~[transactions-6.0.0.jar:na]
        at com.atomikos.icatch.imp.RecoveryDomainService$1.alarm(RecoveryDomainService.java:57) ~[transactions-6.0.0.jar:na]
        at com.atomikos.timing.PooledAlarmTimer.notifyListeners(PooledAlarmTimer.java:101) ~[atomikos-util-6.0.0.jar:na]
        at com.atomikos.timing.PooledAlarmTimer.run(PooledAlarmTimer.java:88) ~[atomikos-util-6.0.0.jar:na]
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[na:na]
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[na:na]
        at java.base/java.lang.Thread.run(Thread.java:1583) ~[na:na]
Caused by: jakarta.jms.InvalidClientIDException: Broker: localhost - Client: my-unique-client-id already connected from tcp://192.168.48.1:45798
        at org.apache.activemq.broker.region.RegionBroker.addConnection(RegionBroker.java:265) ~[na:na]
        at org.apache.activemq.broker.jmx.ManagedRegionBroker.addConnection(ManagedRegionBroker.java:230) ~[na:na]
        at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:99) ~[na:na]
        at org.apache.activemq.advisory.AdvisoryBroker.addConnection(AdvisoryBroker.java:119) ~[na:na]
        at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:99) ~[na:na]
        at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:99) ~[na:na]
        at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:99) ~[na:na]
        at org.apache.activemq.broker.TransportConnection.processAddConnection(TransportConnection.java:854) ~[na:na]
        at org.apache.activemq.broker.jmx.ManagedTransportConnection.processAddConnection(ManagedTransportConnection.java:77) ~[na:na]
        at org.apache.activemq.command.ConnectionInfo.visit(ConnectionInfo.java:139) ~[activemq-client-6.1.6.jar:6.1.6]
        at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:337) ~[na:na]
        at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:201) ~[na:na]
        at org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:50) ~[activemq-client-6.1.6.jar:6.1.6]
        at org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:125) ~[activemq-client-6.1.6.jar:6.1.6]
        at org.apache.activemq.transport.AbstractInactivityMonitor.onCommand(AbstractInactivityMonitor.java:302) ~[activemq-client-6.1.6.jar:6.1.6]
        at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83) ~[activemq-client-6.1.6.jar:6.1.6]
        at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:234) ~[activemq-client-6.1.6.jar:6.1.6]
        at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:216) ~[activemq-client-6.1.6.jar:6.1.6]
2025-05-20T11:54:51.865+02:00 ERROR 2767634 --- [demo] [nio-8080-exec-1] o.a.c.c.C.[.[.[/].[dispatcherServlet]    : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed: org.springframework.jms.InvalidClientIDException: Broker: localhost - Client: my-unique-client-id already connected from tcp://192.168.48.1:48538] with root cause

jakarta.jms.InvalidClientIDException: Broker: localhost - Client: my-unique-client-id already connected from tcp://192.168.48.1:48538
        at org.apache.activemq.broker.region.RegionBroker.addConnection(RegionBroker.java:265) ~[na:na]
        at org.apache.activemq.broker.jmx.ManagedRegionBroker.addConnection(ManagedRegionBroker.java:230) ~[na:na]
        at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:99) ~[na:na]
        at org.apache.activemq.advisory.AdvisoryBroker.addConnection(AdvisoryBroker.java:119) ~[na:na]
        at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:99) ~[na:na]
        at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:99) ~[na:na]
        at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:99) ~[na:na]
        at org.apache.activemq.broker.TransportConnection.processAddConnection(TransportConnection.java:854) ~[na:na]
        at org.apache.activemq.broker.jmx.ManagedTransportConnection.processAddConnection(ManagedTransportConnection.java:77) ~[na:na]
        at org.apache.activemq.command.ConnectionInfo.visit(ConnectionInfo.java:139) ~[activemq-client-6.1.6.jar:6.1.6]
        at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:337) ~[na:na]
        at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:201) ~[na:na]
        at org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:50) ~[activemq-client-6.1.6.jar:6.1.6]
        at org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:125) ~[activemq-client-6.1.6.jar:6.1.6]
        at org.apache.activemq.transport.AbstractInactivityMonitor.onCommand(AbstractInactivityMonitor.java:302) ~[activemq-client-6.1.6.jar:6.1.6]
        at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83) ~[activemq-client-6.1.6.jar:6.1.6]
        at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:234) ~[activemq-client-6.1.6.jar:6.1.6]
        at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:216) ~[activemq-client-6.1.6.jar:6.1.6]
        at java.lang.Thread.run(Unknown Source) ~[na:na]
2025-05-20T11:56:34.504+02:00 ERROR 2770489 --- [demo] [nio-8080-exec-1] o.a.c.c.C.[.[.[/].[dispatcherServlet]    : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed: org.springframework.jms.connection.SynchedLocalTransactionFailedException: Local JMS transaction failed to commit] with root cause

jakarta.jms.IllegalStateException: The session has already been closed
        at org.messaginghub.pooled.jms.JmsPoolSession.safeGetSessionHolder(JmsPoolSession.java:596) ~[pooled-jms-3.1.7.jar:na]
        at org.messaginghub.pooled.jms.JmsPoolSession.getInternalSession(JmsPoolSession.java:483) ~[pooled-jms-3.1.7.jar:na]
        at org.messaginghub.pooled.jms.JmsPoolSession.commit(JmsPoolSession.java:294) ~[pooled-jms-3.1.7.jar:na]
        at org.springframework.jms.connection.JmsResourceHolder.commitAll(JmsResourceHolder.java:241) ~[spring-jms-6.2.6.jar:6.2.6]
        at org.springframework.jms.connection.ConnectionFactoryUtils$JmsResourceSynchronization.processResourceAfterCommit(ConnectionFactoryUtils.java:439) ~[spring-jms-6.2.6.jar:6.2.6]
        at org.springframework.jms.connection.ConnectionFactoryUtils$JmsResourceSynchronization.processResourceAfterCommit(ConnectionFactoryUtils.java:419) ~[spring-jms-6.2.6.jar:6.2.6]
        at org.springframework.transaction.support.ResourceHolderSynchronization.afterCommit(ResourceHolderSynchronization.java:87) ~[spring-tx-6.2.6.jar:6.2.6]
        at org.springframework.transaction.support.TransactionSynchronizationUtils.invokeAfterCommit(TransactionSynchronizationUtils.java:165) ~[spring-tx-6.2.6.jar:6.2.6]
        at org.springframework.transaction.support.TransactionSynchronizationUtils.triggerAfterCommit(TransactionSynchronizationUtils.java:153) ~[spring-tx-6.2.6.jar:6.2.6]
        at org.springframework.transaction.support.AbstractPlatformTransactionManager.triggerAfterCommit(AbstractPlatformTransactionManager.java:1006) ~[spring-tx-6.2.6.jar:6.2.6]
        at org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:836) ~[spring-tx-6.2.6.jar:6.2.6]
        at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:758) ~[spring-tx-6.2.6.jar:6.2.6]
        at org.springframework.transaction.interceptor.TransactionAspectSupport.commitTransactionAfterReturning(TransactionAspectSupport.java:698) ~[spring-tx-6.2.6.jar:6.2.6]
        at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:416) ~[spring-tx-6.2.6.jar:6.2.6]
        at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:119) ~[spring-tx-6.2.6.jar:6.2.6]
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) ~[spring-aop-6.2.6.jar:6.2.6]
        at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:728) ~[spring-aop-6.2.6.jar:6.2.6]
        at com.example.demo.UserController$$SpringCGLIB$$0.createUser(<generated>) ~[main/:na]
        at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) ~[na:na]
        at java.base/java.lang.reflect.Method.invoke(Method.java:580) ~[na:na]
        at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:258) ~[spring-web-6.2.6.jar:6.2.6]
        at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:191) ~[spring-web-6.2.6.jar:6.2.6]
        at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:118) ~[spring-webmvc-6.2.6.jar:6.2.6]
        at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:986) ~[spring-webmvc-6.2.6.jar:6.2.6]
        at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:891) ~[spring-webmvc-6.2.6.jar:6.2.6]
        at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87) ~[spring-webmvc-6.2.6.jar:6.2.6]
        at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1089) ~[spring-webmvc-6.2.6.jar:6.2.6]
        at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:979) ~[spring-webmvc-6.2.6.jar:6.2.6]
        at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1014) ~[spring-webmvc-6.2.6.jar:6.2.6]
        at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:903) ~[spring-webmvc-6.2.6.jar:6.2.6]
        at jakarta.servlet.http.HttpServlet.service(HttpServlet.java:564) ~[tomcat-embed-core-10.1.40.jar:6.0]
        at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:885) ~[spring-webmvc-6.2.6.jar:6.2.6]
        at jakarta.servlet.http.HttpServlet.service(HttpServlet.java:658) ~[tomcat-embed-core-10.1.40.jar:6.0]
        at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:195) ~[tomcat-embed-core-10.1.40.jar:10.1.40]
        at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:140) ~[tomcat-embed-core-10.1.40.jar:10.1.40]
        at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:51) ~[tomcat-embed-websocket-10.1.40.jar:10.1.40]
        at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:164) ~[tomcat-embed-core-10.1.40.jar:10.1.40]
        at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:140) ~[tomcat-embed-core-10.1.40.jar:10.1.40]
        at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100) ~[spring-web-6.2.6.jar:6.2.6]
        at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:116) ~[spring-web-6.2.6.jar:6.2.6]
        at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:164) ~[tomcat-embed-core-10.1.40.jar:10.1.40]
        at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:140) ~[tomcat-embed-core-10.1.40.jar:10.1.40]
        at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93) ~[spring-web-6.2.6.jar:6.2.6]
        at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:116) ~[spring-web-6.2.6.jar:6.2.6]
        at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:164) ~[tomcat-embed-core-10.1.40.jar:10.1.40]
        at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:140) ~[tomcat-embed-core-10.1.40.jar:10.1.40]
        at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201) ~[spring-web-6.2.6.jar:6.2.6]
        at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:116) ~[spring-web-6.2.6.jar:6.2.6]
        at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:164) ~[tomcat-embed-core-10.1.40.jar:10.1.40]
        at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:140) ~[tomcat-embed-core-10.1.40.jar:10.1.40]
        at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:167) ~[tomcat-embed-core-10.1.40.jar:10.1.40]
        at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:90) ~[tomcat-embed-core-10.1.40.jar:10.1.40]
        at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:483) ~[tomcat-embed-core-10.1.40.jar:10.1.40]
        at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:116) ~[tomcat-embed-core-10.1.40.jar:10.1.40]
        at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:93) ~[tomcat-embed-core-10.1.40.jar:10.1.40]
        at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:74) ~[tomcat-embed-core-10.1.40.jar:10.1.40]
        at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:344) ~[tomcat-embed-core-10.1.40.jar:10.1.40]
        at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:398) ~[tomcat-embed-core-10.1.40.jar:10.1.40]
        at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:63) ~[tomcat-embed-core-10.1.40.jar:10.1.40]
        at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:903) ~[tomcat-embed-core-10.1.40.jar:10.1.40]
        at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1740) ~[tomcat-embed-core-10.1.40.jar:10.1.40]
        at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:52) ~[tomcat-embed-core-10.1.40.jar:10.1.40]
        at org.apache.tomcat.util.threads.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1189) ~[tomcat-embed-core-10.1.40.jar:10.1.40]
        at org.apache.tomcat.util.threads.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:658) ~[tomcat-embed-core-10.1.40.jar:10.1.40]
        at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:63) ~[tomcat-embed-core-10.1.40.jar:10.1.40]
        at java.base/java.lang.Thread.run(Thread.java:1583) ~[na:na]
        
        
AND PERIODICALLY:        

2025-05-20T12:00:51.338+02:00  WARN 2770489 --- [demo] [riodic Recovery] com.arjuna.ats.jta                       : ARJUNA016116: Failed to create JMS connection

jakarta.jms.InvalidClientIDException: Broker: localhost - Client: my-unique-client-id already connected from tcp://192.168.48.1:50764
        at org.apache.activemq.broker.region.RegionBroker.addConnection(RegionBroker.java:265) ~[na:na]
        at org.apache.activemq.broker.jmx.ManagedRegionBroker.addConnection(ManagedRegionBroker.java:230) ~[na:na]
        at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:99) ~[na:na]
        at org.apache.activemq.advisory.AdvisoryBroker.addConnection(AdvisoryBroker.java:119) ~[na:na]
        at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:99) ~[na:na]
        at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:99) ~[na:na]
        at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:99) ~[na:na]
        at org.apache.activemq.broker.TransportConnection.processAddConnection(TransportConnection.java:854) ~[na:na]
        at org.apache.activemq.broker.jmx.ManagedTransportConnection.processAddConnection(ManagedTransportConnection.java:77) ~[na:na]
        at org.apache.activemq.command.ConnectionInfo.visit(ConnectionInfo.java:139) ~[activemq-client-6.1.6.jar:6.1.6]
        at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:337) ~[na:na]
        at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:201) ~[na:na]
        at org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:50) ~[activemq-client-6.1.6.jar:6.1.6]
        at org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:125) ~[activemq-client-6.1.6.jar:6.1.6]
        at org.apache.activemq.transport.AbstractInactivityMonitor.onCommand(AbstractInactivityMonitor.java:302) ~[activemq-client-6.1.6.jar:6.1.6]
        at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83) ~[activemq-client-6.1.6.jar:6.1.6]
        at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:234) ~[activemq-client-6.1.6.jar:6.1.6]
        at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:216) ~[activemq-client-6.1.6.jar:6.1.6]
        at java.lang.Thread.run(Unknown Source) ~[na:na]

Here is my current setup, which is the relevant parts of the demo project https://github.com/proxora/Spring-boot-3-ActiveMQ-XA-Transactions-Durable-Subscription-to-JMS-Topic

dependencies:

plugins {
    id("org.springframework.boot") version "3.4.5"
}
dependencies {
  implementation("org.springframework.boot:spring-boot-starter-activemq")
  runtimeOnly("com.atomikos:transactions-spring-boot3-starter:6.0.0")
  //runtimeOnly("dev.snowdrop:narayana-spring-boot-starter:3.4.0")
}

configuration / application.properties:

spring.jms.pub-sub-domain=true
spring.jms.subscription-durable=true
spring.jms.client-id=my-unique-client-id
spring.jms.template.session.transacted=true

spring.activemq.broker-url=tcp://localhost:61616
spring.activemq.user=admin
spring.activemq.password=admin

Java Config:

    @Bean
    ActiveMQConnectionFactoryCustomizer activeMQConnectionFactoryCustomizer(@Value("${spring.jms.client-id}") String clientId) {
        return connectionFactory -> {
            log.info("Configuring JMS connection factory: {}", connectionFactory.getClass().getName());
            connectionFactory.setClientID(clientId);
        };
    }

    @Bean
    MessageConverter jacksonJmsMessageConverter() {
        MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
        converter.setTargetType(MessageType.TEXT);
        converter.setTypeIdPropertyName("_type");
        return converter;
    }

JMS Consumer and Producer:

    @Transactional
    @EventListener
    public void on(UserChangedEvent event) {
        log.info("Sending MSG to JMS destinations with connectionFactory='{}',  connectionFactoryClass='{}' , isSessionTransacted='{}'  <{}> ...",
                jmsTopicTemplate.getConnectionFactory(), jmsTopicTemplate.getConnectionFactory().getClass().getName(), jmsTopicTemplate.isSessionTransacted(), event);
        jmsTopicTemplate.convertAndSend("my_user_topic", event);
        log.info("sending done");
    }

    @Transactional
    @JmsListener(destination = "my_user_topic")
    public void receiveUserChangedEventFromTopic(UserChangedEvent event) {
        log.info("Received MSG From Topic Listener 1 <{}>", event);
        userRepository.findById(event.getUser().getId())
                .ifPresent(user -> log.info("loaded user from DB :<{}>", user));
    }

Can anyone help me setup this components in a Spring Boot 3 Application:

I expect a working application without Exceptions / ERROR or WARN logs complaining about InvalidClientIDException: Broker: localhost - Client: my-unique-client-id already connected from tcp://192.168.48.1:50764


Solution

  • You have an error about having multiple connections using the same clientId value. The broker will reject the second connection:

    Client: my-unique-client-id already connected from tcp://192.168.48.1:48538
    

    Also, check into using Virtual Topics. They provide the ability for an app to publish to a topic and have multiple subscribers read from a queue. This approach provides more features than shared durable topic subscriptions, and is more straight-forward to rationalize the in a cluster of brokers.