Sample project available on Github: https://github.com/codependent/rsocket-rating-service
A Spring Boot RSocket server message mapping expects a requestResponse request, returning a simple POJO:
@MessageMapping("request-rating")
fun getRatingWebSocket(ratingRequest: RatingRequest): Mono<Rating> {
return Mono.just(Rating(ratingRequest.songId, (0..10).random())).log()
.doOnNext {
logger.info("Next {}", it)
}
.doOnCancel {
logger.info("Cancel")
}
.doOnSuccess {
logger.info("Success {}", it)
}
.doOnError { throwable ->
logger.error("Error {}", throwable)
}
.doOnTerminate { logger.info("Terminate") }
}
On the client side I have the following JS code to connect to the RSocket server and request a value:
const {
RSocketClient,
JsonSerializer,
IdentitySerializer,
} = require('rsocket-core');
const RSocketWebSocketClient = require('rsocket-websocket-client').default;
const route = 'request-rating';
let client = undefined;
let rSocket = undefined;
function main() {
if (client !== undefined) {
client.close();
}
client = new RSocketClient({
serializers: {
data: JsonSerializer,
metadata: IdentitySerializer
},
setup: {
// ms btw sending keepalive to server
keepAlive: 60000,
// ms timeout if no keepalive response
lifetime: 180000,
// format of `data`
dataMimeType: 'application/json',
// format of `metadata`
metadataMimeType: 'message/x.rsocket.routing.v0',
},
transport: new RSocketWebSocketClient({
url: 'ws://localhost:8080/rating-ws'
}),
});
// Open the connection
client.connect().subscribe({
onComplete: socket => {
// socket provides the rsocket interactions fire/forget, request/response,
// request/stream, etc as well as methods to close the socket.
rSocket = socket;
},
onError: error => {
console.log("Connection has been refused due to ", error);
},
onSubscribe: cancel => {
/* call cancel() to abort */
}
});
document.getElementById('sendButton').addEventListener('click', requestRating);
}
function requestRating() {
rSocket.requestResponse({
data: {
'songId': document.getElementById("songId").value
},
metadata: String.fromCharCode(route.length) + route
}).subscribe({
onComplete: () => {
console.log('Complete')
},
onError: error => {
console.log("Connection has been closed due to " + error);
},
onNext: payload => {
console.log(payload.data);
},
onSubscribe: subscription => {
//subscription.request(1)
console.log("Subscribed")
}
});
}
document.addEventListener('DOMContentLoaded', main);
index.html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Rating Service</title>
<link href="/webjars/bootstrap/css/bootstrap.min.css" rel="stylesheet">
<script src="/webjars/jquery/jquery.min.js"></script>
<script src="/webjars/stomp-websocket/stomp.min.js"></script>
<script src="bundle.js"></script>
</head>
<body>
<div>Request your rating</div>
<div>
<label> ClientId:
<input id="songId" type="text" name="songId"/>
</label>
</div>
<div><input id="sendButton" type="button" name="send" value="Send"/></div>
</body>
</html>
After accessing http://localhost:8080/index.html enter some text and push send.
The request gets to the server which logs the correct generation of an onNext value:
[ctor-http-nio-6] reactor.Mono.Just.1 : | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
[ctor-http-nio-6] reactor.Mono.Just.1 : | request(1)
[ctor-http-nio-6] reactor.Mono.Just.1 : | onNext(Rating(songId=asdfas, value=0))
[ctor-http-nio-6] c.c.r.r.c.RatingServiceRestController : Next Rating(songId=asdfas, value=0)
[ctor-http-nio-6] c.c.r.r.c.RatingServiceRestController : Success Rating(songId=asdfas, value=0)
[ctor-http-nio-6] c.c.r.r.c.RatingServiceRestController : Terminate
[ctor-http-nio-6] reactor.Mono.Just.1 : | onComplete()
However on the client-side
onNext: payload => {
console.log(payload.data);
},
is never called, the browser logs only show:
Subscribed
Complete
Why isn't it getting the generated value from the server?
In RSocket JS, unlike in Reactive Streams, the onComplete signal gets data in the requestResponse interaction mode.
This doesn`t seem to follow the protocol spec(https://rsocket.io/about/protocol):
Both (C)omplete and (N)ext set meaning PAYLOAD contains data and signals stream completion. For example: An Observable stream receiving onNext(payload) followed by onComplete(). Just (C)omplete set meaning PAYLOAD contains no data and only signals stream completion. For example: An Observable stream receiving onComplete(). Just (N)ext set meaning PAYLOAD contains data stream is NOT completed. For example: An Observable stream receiving onNext(payload).
To workaround this I had to modify the callback function to this:
onComplete: completeData => {
console.log('Complete '+ completeData.data)
},