javaspring-jms

JMS MessageListenerContainer.stop() called but subscriber count doesn't decrease


I have a Spring Boot app which needs to dynamically subscribe and unsubscribe (non-durable) to a JMS topic. The subscription all works and so does the unsubscribe (according to the code) but the subscriber count on the JMS topic doesn't decrease. What is it I'm missing/not understanding?

Here's some stripped down code:

@Component
@Scope("prototype")
public class DataStreamController {

    @Autowired
    @Qualifier("pubsubNonDurableJmsListenerContainer")
    private DefaultJmsListenerContainerFactory listenerContainerFactory;
    @Autowired
    private JmsListenerEndpointRegistry jmsListenerEndpointRegistry;
    private DataStreamListener listener;
    private String customer;
    private final String uuidStream1 = UUID.randomUUID().toString();
    private final String uuidStream2 = UUID.randomUUID().toString();

    public DataStreamController(String customer, DataStreamListener listener) {
        this.customer = customer;
        this.listener = listener;
    }

    @PostConstruct
    public void init() {
        subscribeToTopics();
    }

    public void subscribeToTopics() {
        SimpleJmsListenerEndpoint stream1TopicEndpoint = new SimpleJmsListenerEndpoint();
        stream1TopicEndpoint.setDestination("stream1." + customer);
        stream1TopicEndpoint.setId(uuidStream1);
        stream1TopicEndPoint.setMessageListener(message ->
            onStream1(message);

        SimpleJmsListenerEndpoint stream2TopicEndpoint = new SimpleJmsListenerEndpoint();
        stream2TopicEndpoint.setDestination("stream2." + customer);
        stream2TopicEndpoint.setId(uuidStream2);
        stream2TopicEndPoint.setMessageListener(message ->
            onStream2(message);

        jmsListenerEndpointRegistry.registerListenerContainer(stream1TopicEndpoint, listenerContainerFactory, true);
        jmsListenerEndpointRegistry.registerListenerContainer(stream2TopicEndpoint, listenerContainerFactory, true);
    }

    public void onStream1(final Message message) {
        // do some stuff
        listener.onData(// json object);
    }

    public void onStream2(final Message message) {
        // do some stuff
        listener.onData(// json object);
    }

    public void close() {
        jmsListenerEndpointRegistry.getListenerContainer(uuidStream1).stop();
        jmsListenerEndpointRegistry.getListenerContainer(uuidStream2).stop();
        logger.info("stream1 isRunning? " + jmsListenerEndpointRegistry.getListenerContainer(uuidStream1).isRunning());
        logger.info("stream2 isRunning? " + jmsListenerEndpointRegistry.getListenerContainer(uuidStream2).isRunning());
    }
}

When calling close() my logs say: stream1 isRunning? false and yet (as stated earlier) the subscriber is still active.


Solution

  • Stopping the container just stops the container from listening, it does not close the consumer. You need to call shutDown() to close the consumer.

    Not able to stop an IBM MQ JMS consumer in Spring Boot