javaspring-bootspring-integrationstomp

Can same StompSessionManager instance be used for different destinations?


I have one spring integration flow with StompInboundChannelAdapter listening for incoming messages from one destination and another integration flow with StompMessageHandler for sending messages to a different destination. Can I use the same instance of StompSessionManager for both? Or each flow should have its own instance? The STOMP Server is the same.

I tried with a singleton instance with the following configuration and it seems to work, but I don't know if this is the correct way or I'm missing something:

@Configuration
public class StompSessionManagerConfiguration {

  @Value("${host}")
  private String host;

  @Value("${port}")
  private Integer port;

  @Value("${login}")
  private String login;

  @Value("${passcode}")
  private String passcode;

  @Bean
  public StompSessionManager stompSessionManager() {
    ReactorNettyTcpStompClient stompClient = new ReactorNettyTcpStompClient(host, port);
    ReactorNettyTcpStompSessionManager stompSessionManager = 
        new ReactorNettyTcpStompSessionManager(stompClient);
    stompSessionManager.setConnectHeaders(connectHeaders());
    return stompSessionManager;
  }

  public StompHeaders connectHeaders() {
    StompHeaders connectHeaders = new StompHeaders();
    connectHeaders.setLogin(login);
    connectHeaders.setPasscode(passcode);
    return connectHeaders;
  }
}
@Configuration
public class IncomingMessageFlowConfiguration {

  @Autowired
  private StompSessionManager stompSessionManager;

  @Bean
  public IntegrationFlow incomingMessageFlow() {
    return IntegrationFlow.from(stompInboundChannelAdapter())
        .channel("incomingMessageChannel").get();
  }

  public StompInboundChannelAdapter stompInboundChannelAdapter() {
    StompInboundChannelAdapter adapter = 
        new StompInboundChannelAdapter(stompSessionManager, "incomingDestination");
    adapter.setPayloadType(byte[].class);
    return adapter;
  }
}
@Configuration
public class OutgoingMessageFlowConfiguration {

  @Autowired
  private StompSessionManager stompSessionManager;

  @Bean
  public IntegrationFlow outgoingMessageFlow() {
    return IntegrationFlow.from("outgoingMessageChannel")
        .handle(stompMessageHandler()).get();
  }

  public StompMessageHandler stompMessageHandler() {
    StompMessageHandler stompMessageHandler = new StompMessageHandler(stompSessionManager);
    stompMessageHandler.setDestination("outgoingDestination");
    return stompMessageHandler;
  }
}

Solution

  • That's correct. You can share StompSessionManager between different endpoints. Its goal is to manage a single client session connected to STOMP broker.

    We even have an integration test in the project where we share StompSessionManager:

        @Bean
        public StompSessionManager stompSessionManager() {
            AbstractStompSessionManager stompSessionManager = new ReactorNettyTcpStompSessionManager(stompClient);
            stompSessionManager.setAutoReceipt(true);
            stompSessionManager.setRecoveryInterval(500);
            return stompSessionManager;
        }
    
        @Bean
        public PollableChannel stompInputChannel() {
            return new QueueChannel();
        }
    
        @Bean
        public StompInboundChannelAdapter stompInboundChannelAdapter() {
            StompInboundChannelAdapter adapter =
                    new StompInboundChannelAdapter(stompSessionManager(), "/topic/myTopic");
            adapter.setOutputChannel(stompInputChannel());
            return adapter;
        }
    
        @Bean
        @ServiceActivator(inputChannel = "stompOutputChannel")
        public MessageHandler stompMessageHandler() {
            StompMessageHandler handler = new StompMessageHandler(stompSessionManager());
            handler.setDestination("/topic/myTopic");
            handler.setConnectTimeout(1000);
            return handler;
        }
    

    See more info in the source code: https://github.com/spring-projects/spring-integration/blob/main/spring-integration-stomp/src/test/java/org/springframework/integration/stomp/client/StompServerIntegrationTests.java