spring-webfluxspring-websocketrsocketspring-rsocketrsocket-js

Rsocket-js Unable to set destination route when connection to channel react native client


I am new to Rsocket and reactive Websocket and I develop an application where I need multiple channels, where my clients can subscribe. Until now I tried 2 solutions but unfortunately non of them workd as I expected.

  1. I implemented a reactive Websocket server with Spring where I stored every client connection in a FluxSink list, but unfortunately with that soultion I couldnt manage to implement channels, and that is the reason I tried RSocket. My first question is it possible to implement channels with this workflow? Do I understend correctly only one websocket connection should be open between one client and the server? Because that is the reason I spend so mutch time trying to figure out how to create channels.

  2. With my fist soulion I could establish connection between my server and my client and they could communicate but unfortunately I couldt create channels. So I tried RSocket because I read that with this protocol it is quite easy to create bidirectional channels, but unfortunately I cannot connect to them

I have a Controller with this @MessageMapping

@MessageMapping("my-channel")
public Flux<String> channel(final Flux<String> message) {
  log.info("Received stream-stream (channel) request... ");

  return message
        .doOnNext(m -> log.info("Request: ", m))
        .doOnCancel(() -> log.warn("Client cancelled the channel"))
        .switchMap(setting -> Flux.interval(Duration.ofSeconds(10)).map(index -> "Love spring if it works"))
        .log()
        ;
 }

When I try to connect with rsc client everything works fine.

./rsc --debug --channel --data=asd --route=my-channel --stacktrace ws://localhost:7000

But unfortunately when I try to connect from my react native client the Destination route is alwayes ''. Of course if I change @MessageMapping("my-channel") -> @MessageMapping("") it works perfectly but for obvious reasions this solution is not appropriate for me.

const connector = new RSocketConnector({
  setup: {
    keepAlive: 100,
    lifetime: 10000,
    metadataMimeType: "application/json",
    dataMimeType: "message/x.rsocket.routing.v0"
  },
  transport: new WebsocketClientTransport({
    url: "ws://192.168.0.107:7000",
    wsCreator: (url) => new WebSocket(url) as any,
  }),
});

const rsocket = await connector.connect();

await new Promise((resolve, reject) => {
  const requester = rsocket.requestChannel(
    {
      data: undefined,
      metadata: Buffer.concat([
        Buffer.from(String.fromCharCode('my-channel'.length)),
        Buffer.from('my-channel'),
      ]),
    },
    1,
    false,
    {
      onError: (e) => reject(e),
      onNext: (payload, isComplete) => {
        console.log(
          `payload[data: ${payload.data}; metadata: ${payload.metadata}]|${isComplete}`
        );

        requester.request(1);

        if (isComplete) {
          resolve(payload);
        }
      },
      onComplete: () => {
        resolve(null);
      },
      onExtension: () => { },
      request: (n) => {
        console.log(`request(${n})`);
        requester.onNext(
          {
            data: Buffer.from("Message"),
          },
          true
        );
      },
      cancel: () => { },
    }
  );
}).catch(e => {
  console.log("-------------ERRORRRRRR-------------")
  console.error(e);
});

The error message is this:

-------------ERRORRRRRR-------------
ERROR  [Error: Destination '' does not support REQUEST_CHANNEL. Supported interaction(s): [METADATA_PUSH, SETUP]]

What am I doing wrong?

And a last question how can I send additional header when I establish the connection?

Thank you for your help in advance.


Solution

  • With the help of OlegDokuka in this discussion I could manage the issue. The only problem was as he mentioned.

    Actual:

    metadataMimeType: "application/json",
    dataMimeType: "message/x.rsocket.routing.v0"
    

    Have to be

    dataMimeType: "application/json",
    metadataMimeType: "message/x.rsocket.routing.v0"