I try to implement an RSocket server in Java and a client in JavaScript, but I can't call any of the methods in my backend.
Java server
public final class RawServer {
public static void main(String[] args) {
RSocketFactory.receive()
.acceptor((setup, sendingSocket) -> Mono.just(new DefaultSimpleService()))
.transport(WebsocketServerTransport.create("localhost", 8801))
.start()
.block()
.onClose()
.block();
}
private static final class DefaultSimpleService extends AbstractRSocket {
private ObjectMapper jsonMapper = new ObjectMapper();
@Override
public Flux<Payload> requestStream(Payload payload) {
return Mono.just(payload.getDataUtf8())
.map(json -> {
try {
return jsonMapper.readValue(json, Message.class);
} catch (IOException e) {
e.printStackTrace();
return null;
}
})
.doOnNext(msg -> System.out.println("got message " + msg.message))
.flatMapMany(msg -> Flux.range(0, 5)
.map(count -> msg.message + " #" + count))
.map(message -> DefaultPayload.create(message));
}
}
}
public class Message {
public final String message;
@JsonCreator
public Message(@JsonProperty("message") String message) {
this.message = message;
}
}
JavaScript client
import { RSocketClient, JsonSerializers } from "rsocket-core";
import RSocketWebSocketClient from "rsocket-websocket-client";
const transport = new RSocketWebSocketClient({
url: "ws://localhost:8801"
});
const client = new RSocketClient({
// send/receive JSON objects instead of strings/buffers
serializers: JsonSerializers,
setup: {
// ms btw sending keepalive to server
keepAlive: 60000,
// ms timeout if no keepalive response
lifetime: 180000,
// format of `data`
dataMimeType: "application/json",
// format of `metadata`
metadataMimeType: "application/json"
},
transport
});
client.connect().subscribe({
onComplete: socket => {
socket.requestStream({
data: { message: "hello from javascript!" },
metadata: null
});
},
onError: error => {
console.log("got error");
console.error(error);
},
onSubscribe: cancel => {
/* call cancel() to abort */
console.log("subscribe!");
console.log(cancel);
// cancel.cancel();
}
});
It seems like the WebSocket connection is established, but no message is pushed to the server. How can I do this?
I also implemented the client side in Java and it work's fine. The JavaScript example I found is https://github.com/rsocket/rsocket-js/blob/master/docs/01-client-configuration.md, but I can't make it work.
For more examples with RSocket you can visit my personal blog http://kojotdev.com/2019/09/rsocket-examples-java-javascript-spring-webflux/
Ok, I figured it out. First, we need to fix our server to return the correct JSON object.
@Override
public Flux<Payload> requestStream(Payload payload) {
log.info("got requestStream in Server");
log.info(payload.getDataUtf8());
return Mono.just(payload.getDataUtf8())
.map(payloadString -> MessageMapper.jsonToMessage(payloadString))
.flatMapMany(msg -> Flux.range(0, 5)
.map(count -> msg.message + " | requestStream from Server #" + count)
.map(responseText -> new Message(responseText))
.map(responseMessage -> MessageMapper.messageToJson(responseMessage)))
.map(message -> DefaultPayload.create(message));
}
Then, in our JavaScript client, we need to change the socket.requestStream
method to this:
socket
.requestStream({
data: { message: "request - stream from javascript!" },
metadata: ""
})
.subscribe({
onComplete: () => console.log("requestStream done"),
onError: error => {
console.log("got error with requestStream");
console.error(error);
},
onNext: value => {
// console.log("got next value in requestStream..");
console.log(value.data);
},
// Nothing happens until `request(n)` is called
onSubscribe: sub => {
console.log("subscribe request Stream!");
sub.request(7);
}
});
Everything else stays as in the previous example. Helpful links: