jmsspring-jms

How to implement multiple applications to consume from the same topic at the same time?


I have the following applications:

Publisher Server --> Publish {name} to topic name

Consumer Server A --> Consumes topic name, prints "Hello, " + name

Consumer Server B --> Consumes topic name, prints "Good bye, " + name

I'm using Spring Boot 2.7 with Spring JMS.

The expected result:

For each name published, both consumer servers consume the data, so the expected result for each published message should be (IE publish name John and then Jane):

Hello, John
Good bye, John

Hello, Jane
Good bye, Jane

The actual result:

Hello, John
Good bye, Jane

Meaning, the two consumers are alternating to consume from the topic.

The code:

Each server has the same application.yml configuration to connect with the JMS broker.

Publisher Server - publisher code:

@RequiredArgsConstructor
@Component
public class Publisher {
    private final JmsTemplate jmsTemplate;
    
    public void publish(final String name) {
        jmsTemplate.convertAndSend("name-topic", name);
    }
}

Consumer Server A - Subscriber code

@RequiredArgsConstructor
@Component
public class SubscriberA {
    @JmsListener(destination = "name-topic")
    void welcome(final String name) {
        System.out.println("Hello, " + name);
    }
}

Consumer ServerB - Subscriber code

@RequiredArgsConstructor
@Component
public class SubscriberB {
    @JmsListener(destination = "name-topic")
    void welcome(final String name) {
        System.out.println("Good bye, " + name);
    }
}

I tried to use the different properties of the @JmsListener like the concurrency property, with no luck.


Solution

  • I found the solution.

    By default, Spring JMS configuration is set to work with Queues instead of Topics.

    Creating the necessary configuration beans and setting the .setPubSubDomain(true); fixed the problem.

    Changes I needed to implement:

    Add JMS configuration:

    @RequiredArgsConstructor
    @Configuration
    @EnableJms
    public class JmsConfiguration {
    
        private final ConnectionFactory connectionFactory;
    
        @Bean
        public JmsTemplate jmsTemplate() {
            // To publish messages as topics, setPubSubDomain to true
            final JmsTemplate jmsTemplate = new JmsTemplate();
            jmsTemplate.setPubSubDomain(true);
            jmsTemplate.setConnectionFactory(connectionFactory);
            return jmsTemplate;
        }
    
        @Bean
        public DefaultJmsListenerContainerFactory defaultJmsListenerContainerFactory() {
            // To subscribe to topics instead of queues, setPubSubDomain to true
            final DefaultJmsListenerContainerFactory defaultJmsListenerContainerFactory = new  DefaultJmsListenerContainerFactory();
            defaultJmsListenerContainerFactory.setConnectionFactory(connectionFactory);
            defaultJmsListenerContainerFactory.setSessionTransacted(true);
            defaultJmsListenerContainerFactory.setPubSubDomain(true);
            return defaultJmsListenerContainerFactory;
        }
    
    }
    

    Change the @JmsListener(s), adding the created ContainerFactory

    @RequiredArgsConstructor
    @Component
    public class SubscriberA {
        @JmsListener(destination = "name-topic", containerFactory = "defaultJmsListenerContainerFactory")
        void welcome(final String name) {
            System.out.println("Hello, " + name);
        }
    }
    
    @RequiredArgsConstructor
    @Component
    public class SubscriberB {
        @JmsListener(destination = "name-topic", containerFactory = "defaultJmsListenerContainerFactory")
        void welcome(final String name) {
            System.out.println("Good bye, " + name);
        }
    }