Given an Rsocket endpoint (Spring)
@MessageMapping("chat.{chatId}")
Flux<Message> getChats(@DestinationVariable String chatId) {
Mono<Chat> data = chatRepository.findById(chatId);
return data.map(chatGroup -> chatGroup.getMessages())
.flatMapMany(Flux::fromIterable);
}
public interface ChatRepository extends FirestoreReactiveRepository<Chat> {
}
and a client connecting to it via websocket: JS (Rsocket 1.x)
const makeConnector = () => {
return new RSocketConnector({
setup: {
dataMimeType: 'application/json',
keepAlive: 100000,
lifetime: 100000,
metadataMimeType: 'message/x.rsocket.routing.v0',
},
transport: new WebsocketClientTransport({
url: 'ws://localhost:7000/rsocket',
}),
});
};
client()
.connect()
.then((socket) => {
const requester = socket.requestStream(
{
data: undefined,
metadata: Buffer.concat([
Buffer.from(String.fromCharCode('chat.chatId'.length)),
Buffer.from('chat.chatId'),
]),
},
10,
{
onError: (e) => console.log('error getting data', e),
onNext: (payload, isComplete) => {
const parsedData: Chat = JSON.parse(payload.data.toString());
requester.request(5); // request the next 5 chats
},
onComplete: () => {
console.log('complete');
},
onExtension: () => {
console.log('on extension');
},
}
);
});
When the requestStream
data is completed. (ie. complete
is printed in console)
Then the new events are no longer received after complete. Is it possible to listen to new events if a stream is completed?
I went around this using an intervals, but this introduce new issues such as duplicate chats being received. I can filter it on the frontend side, but there must be a better way to do this
return Flux.interval(Duration.ofSeconds(5)).flatMap(x -> {
return chatService.findChats(matchId);
});
RSocket-JS does not provide by default any "re-subscription" mechanism when a connection is closed : Meaning when your connection is closed or completed, you can not receive events on this connection anymore.
In RSocket, when the WebSocket connection status is 'CLOSED' or 'ERROR', the subscription is cancelled. Meaning you can't receive any new events anymore.
You can see it in their RSocketClient.js, line 91 : https://github.com/rsocket/rsocket-js/blob/master/packages/rsocket-core/src/RSocketClient.js
If you would like to continue receiving new events after your current connection was closed, you need to create another one. To do it, you can inspire you from the example provided by RSocket here : https://github.com/rsocket/rsocket-js/blob/master/packages/rsocket-examples/src/ReconnectExample.js
Personally, I used it in the current project I'm working on, but I re-worked it a bit, in order to add a few additional things :
You can find the complete implementation I did here : https://github.com/icure-io/icure-medical-device-js-sdk/blob/master/src/utils/rsocket.ts