I have a Spring Boot app which needs to dynamically subscribe and unsubscribe (non-durable) to a JMS topic. The subscription all works and so does the unsubscribe (according to the code) but the subscriber count on the JMS topic doesn't decrease. What is it I'm missing/not understanding?
Here's some stripped down code:
@Component
@Scope("prototype")
public class DataStreamController {
@Autowired
@Qualifier("pubsubNonDurableJmsListenerContainer")
private DefaultJmsListenerContainerFactory listenerContainerFactory;
@Autowired
private JmsListenerEndpointRegistry jmsListenerEndpointRegistry;
private DataStreamListener listener;
private String customer;
private final String uuidStream1 = UUID.randomUUID().toString();
private final String uuidStream2 = UUID.randomUUID().toString();
public DataStreamController(String customer, DataStreamListener listener) {
this.customer = customer;
this.listener = listener;
}
@PostConstruct
public void init() {
subscribeToTopics();
}
public void subscribeToTopics() {
SimpleJmsListenerEndpoint stream1TopicEndpoint = new SimpleJmsListenerEndpoint();
stream1TopicEndpoint.setDestination("stream1." + customer);
stream1TopicEndpoint.setId(uuidStream1);
stream1TopicEndPoint.setMessageListener(message ->
onStream1(message);
SimpleJmsListenerEndpoint stream2TopicEndpoint = new SimpleJmsListenerEndpoint();
stream2TopicEndpoint.setDestination("stream2." + customer);
stream2TopicEndpoint.setId(uuidStream2);
stream2TopicEndPoint.setMessageListener(message ->
onStream2(message);
jmsListenerEndpointRegistry.registerListenerContainer(stream1TopicEndpoint, listenerContainerFactory, true);
jmsListenerEndpointRegistry.registerListenerContainer(stream2TopicEndpoint, listenerContainerFactory, true);
}
public void onStream1(final Message message) {
// do some stuff
listener.onData(// json object);
}
public void onStream2(final Message message) {
// do some stuff
listener.onData(// json object);
}
public void close() {
jmsListenerEndpointRegistry.getListenerContainer(uuidStream1).stop();
jmsListenerEndpointRegistry.getListenerContainer(uuidStream2).stop();
logger.info("stream1 isRunning? " + jmsListenerEndpointRegistry.getListenerContainer(uuidStream1).isRunning());
logger.info("stream2 isRunning? " + jmsListenerEndpointRegistry.getListenerContainer(uuidStream2).isRunning());
}
}
When calling close()
my logs say: stream1 isRunning? false
and yet (as stated earlier) the subscriber is still active.
Stopping the container just stops the container from listening, it does not close the consumer. You need to call shutDown()
to close the consumer.