I have one spring integration flow with StompInboundChannelAdapter
listening for incoming messages from one destination and another integration flow with StompMessageHandler
for sending messages to a different destination. Can I use the same instance of StompSessionManager
for both? Or each flow should have its own instance? The STOMP Server is the same.
I tried with a singleton instance with the following configuration and it seems to work, but I don't know if this is the correct way or I'm missing something:
@Configuration
public class StompSessionManagerConfiguration {
@Value("${host}")
private String host;
@Value("${port}")
private Integer port;
@Value("${login}")
private String login;
@Value("${passcode}")
private String passcode;
@Bean
public StompSessionManager stompSessionManager() {
ReactorNettyTcpStompClient stompClient = new ReactorNettyTcpStompClient(host, port);
ReactorNettyTcpStompSessionManager stompSessionManager =
new ReactorNettyTcpStompSessionManager(stompClient);
stompSessionManager.setConnectHeaders(connectHeaders());
return stompSessionManager;
}
public StompHeaders connectHeaders() {
StompHeaders connectHeaders = new StompHeaders();
connectHeaders.setLogin(login);
connectHeaders.setPasscode(passcode);
return connectHeaders;
}
}
@Configuration
public class IncomingMessageFlowConfiguration {
@Autowired
private StompSessionManager stompSessionManager;
@Bean
public IntegrationFlow incomingMessageFlow() {
return IntegrationFlow.from(stompInboundChannelAdapter())
.channel("incomingMessageChannel").get();
}
public StompInboundChannelAdapter stompInboundChannelAdapter() {
StompInboundChannelAdapter adapter =
new StompInboundChannelAdapter(stompSessionManager, "incomingDestination");
adapter.setPayloadType(byte[].class);
return adapter;
}
}
@Configuration
public class OutgoingMessageFlowConfiguration {
@Autowired
private StompSessionManager stompSessionManager;
@Bean
public IntegrationFlow outgoingMessageFlow() {
return IntegrationFlow.from("outgoingMessageChannel")
.handle(stompMessageHandler()).get();
}
public StompMessageHandler stompMessageHandler() {
StompMessageHandler stompMessageHandler = new StompMessageHandler(stompSessionManager);
stompMessageHandler.setDestination("outgoingDestination");
return stompMessageHandler;
}
}
That's correct. You can share StompSessionManager
between different endpoints. Its goal is to manage a single client session connected to STOMP broker.
We even have an integration test in the project where we share StompSessionManager
:
@Bean
public StompSessionManager stompSessionManager() {
AbstractStompSessionManager stompSessionManager = new ReactorNettyTcpStompSessionManager(stompClient);
stompSessionManager.setAutoReceipt(true);
stompSessionManager.setRecoveryInterval(500);
return stompSessionManager;
}
@Bean
public PollableChannel stompInputChannel() {
return new QueueChannel();
}
@Bean
public StompInboundChannelAdapter stompInboundChannelAdapter() {
StompInboundChannelAdapter adapter =
new StompInboundChannelAdapter(stompSessionManager(), "/topic/myTopic");
adapter.setOutputChannel(stompInputChannel());
return adapter;
}
@Bean
@ServiceActivator(inputChannel = "stompOutputChannel")
public MessageHandler stompMessageHandler() {
StompMessageHandler handler = new StompMessageHandler(stompSessionManager());
handler.setDestination("/topic/myTopic");
handler.setConnectTimeout(1000);
return handler;
}
See more info in the source code: https://github.com/spring-projects/spring-integration/blob/main/spring-integration-stomp/src/test/java/org/springframework/integration/stomp/client/StompServerIntegrationTests.java