I have a websocket endpoint that allows clients to subscribe to messages that pertain to a specific conversation. The messages are kept in a MongoDb collection since there are multiple instances of the service at any given time. As such, each web socket session needs to spin up a poller that queries only for messages that pertain to it.
It is also possible for more that one websocket session to be interested in the same conversation.
The websocket client is expected to provide a conversationId query param during the handshake (although I think this could be made into part of the URL maybe):
ws://my.api.com/wsflow/websocket?conversationId=12345
I created a ServerWebSocketContainer
and set a HandshakeInterceptor
that gets the conversationId query param and saves it as an attribute. Then I set a WebSocketHandlerDecorator and overrode the afterConnectionEstablished()
to create a flow that polls for messages related to that conversation id. For each message it enriches the sessionId
header with the id of the WS session. Then it sends it off to another channel which has a WebSocketOutboundMessageHandler on the other end.
In order to prevent duplicate messages and to also allow for multiple sessions subscribing to the same conversation, I have included a "sessions" array on the document. Thus, the poller query will search for messages with the provided conversation id, and also where the current ws session id is not in the array of sessions that have already seen the message.
This means the inbound adapter must also have an update statement that does a push
which appends the current session id to the array in the payload. This feels a bit idiosyncratic, however I can't think of another way that doesn't increase the complexity too much.
The code is shown below. It seems to work (locally at least), but I'm not sure if there is a more natural way to do this. I also haven't determined if I really need to include useFlowIdAsPrefix()
and if that has an impact on the afterConnectionClosed()
call which simply does flowContext.remove()
.
@Bean
public ServerWebSocketContainer serverContainer(MongoTemplate mongoTemplate, IntegrationFlowContext flowContext) {
ServerWebSocketContainer container = new ServerWebSocketContainer("/wsflow").withSockJs(
new SockJsServiceOptions().setHeartbeatTime(5000L).setTaskScheduler(new TaskSchedulerBuilder().build()));
container.setInterceptors(new HandshakeInterceptor() {
@Override
public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response,
WebSocketHandler wsHandler, Exception exception) {}
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response,
WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
String conversationId = ((ServletServerHttpRequest)request).getServletRequest().getParameter("conversationId");
attributes.put("conversationId", conversationId);
return true;
}
});
container.setDecoratorFactories(handler -> new WebSocketHandlerDecorator(handler) {
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
super.afterConnectionEstablished(session);
String conversationId = session.getAttributes().get("conversationId").toString();
IntegrationFlow f = IntegrationFlow.from(MongoDb.inboundChannelAdapter(mongoTemplate.getMongoDatabaseFactory(),
Query.query(Criteria.where("conversationId").is(conversationId).and("sessions").nin(session.getId())))
.collectionName("test").entityClass(Document.class)
.update(new Update().push("sessions", session.getId())),
p -> p.poller(pm -> pm.fixedDelay(1000L)))
.split()
.enrichHeaders(s -> s.header(SimpMessageHeaderAccessor.SESSION_ID_HEADER, session.getId()))
.channel("wsMessages")
.get();
flowContext.registration(f).id(session.getId()).useFlowIdAsPrefix().register();
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
super.afterConnectionClosed(session, closeStatus);
flowContext.remove(session.getId());
}
});
return container;
}
@Bean
public MessageHandler webSocketOutboundAdapter(ServerWebSocketContainer container) {
return new WebSocketOutboundMessageHandler(container);
}
@Bean
public IntegrationFlow wsOut(MessageHandler webSocketOutboundAdapter) {
return IntegrationFlow.from("wsMessages")
.handle(webSocketOutboundAdapter).get();
}
@Bean
public QueueChannelSpec wsMessages() {
return MessageChannels.queue();
}
Since you talk about persistence and no duplication, I don't think that your idea about "seen sessions" is so bad.
I cannot think about more optimal solution than you already have.
So, there is a single document in MongoDB, which belongs to some conversation has an attribute with the session list which have already seen that message. So, you have already optimized it enough to not pull from DB those messages which are already processed. Even after restart you still will be good. However you might also need to think about removing closed sessions from that list. I'm not sure that new session from the same client would have same id
to persist it forever.