typescriptspring-bootrsocketrsocket-js

How to do an infinite requestStream() in RSocket-JS? What happens when subscription.request() runs out?


I'm trying to create a situation where the RSocket JS client listens to a Spring Boot server. When the server has some data for the client, it sends it and the client onNext() triggers and starts processing/displaying that data. Simple enough, right?

I've seen lots of examples, including one from the official RSocket-JS repository, proposing this kind of setup:

socket
      .requestStream({
        data: new Buffer('request-stream'),
        metadata: null,
      })
      .subscribe({
        onNext: value => console.log('%s', value.data),
        onSubscribe: sub => sub.request(2147483647),
      });

Here sub.request(2147483647) requests n instances of data where n is the max int32. However, n is not infinite. What happens when the server has sent 2147483647 instances of data? Will the client stop accepting new data? If so, what do you do when that happens and how do you even tell when it has happened? Also,how do you then stop this infinite request loop, if needed?

I came up with this:

var subscription: ISubscription = null;
socket
    .requestStream({
        data: new Buffer('request-stream'),
        metadata: null,
    })
    .subscribe({
        onNext: value => {
            console.log('%s', value.data)
            someDataProcessing()
            subscription.request(1)
        },
        onSubscribe: sub => {
            sub.request(1)
            subscription = sub
        },
    });

However while testing this I came across another issue - for some reason, after a few requests, the request loop ends. I'm not sure why this happens, but I'm guessing it might be because of someDataProcessing() which could be blocking (looping through the data, for example) which in turn causes the client to "miss" the new data. We can fix this by increasing the 1 to, for example, 5, but that causes the requests to pile up so I'm not sure if that's the best solution.


Solution

  • How to do an infinite requestStream() in RSocket-JS ?

    RSocket-JS does not provide by default any "re-subscription" mechanism when a connection is closed : Meaning when your connection is closed or completed, you can not receive events on this connection anymore.

    This is due to the fact that events are managed by a Subscription / Subscriber mechanism, internal to R-Socket : When the connection is in 'CLOSED' or 'ERROR' status, the subscription is cancelled.

    You can see it in their RSocketClient.js, line 91 : https://github.com/rsocket/rsocket-js/blob/master/packages/rsocket-core/src/RSocketClient.js

    This means that if you would like to provide an infinite requestStream(), you need to wrap it with a retry mechanism, in order to create a new Connection in case you lost the previous one (Which can happens quite often, for example when the server is not available during an update, you loose the connection, ...). But you also need to take care of this subscription, otherwise, your solution will not run properly either.

    What happens when subscription.request() runs out?

    The number you provide calling subscription.request() is the number of events your subscription will be able to listen to. In other words, the maximum number of time you will go through the onNext() method to manage an event. After you reached this maximum number, if I'm correct, your connection will still be alive, but you won't receive any events in your onNext() method anymore.

    Meaning this needs to be taken into account as well when you're creating your retry mechanism to create your infinite requestStream.

    Then how to create the infinite requestStream() ?

    To create your infinite requestStream() and to properly manage the subscription issue as well, you can inspire you from an example provided by RSocket, which takes care of the connection and its related Subscription : https://github.com/rsocket/rsocket-js/blob/master/packages/rsocket-examples/src/ReconnectExample.js

    Personally, I used it in the current project I'm working on, but I re-worked it a bit, in order to add a few additional things :

    Cleaning up the specific solution code, client requestStream request would give something like :

    export function infiniteRequestStream(maxRetry: number, retryIntervalInMs: number): Promise<ICureRSocket<any, any>> {
        const auth = encodeSimpleAuthMetadata(username, password)
        const setup = {
            keepAlive: 1000000,
            lifetime: 100000,
            dataMimeType: APPLICATION_JSON.string,
            metadataMimeType: MESSAGE_RSOCKET_COMPOSITE_METADATA.string
        };
    
        const clientFactory = () => new RSocketClient({
            serializers: {
                data: {
                    deserialize: JsonSerializer.deserialize,
                    serialize: (x) => Buffer.from(JSON.stringify(x))
                },
                metadata: IdentitySerializer,
            },
            setup,
            transport: new RSocketWebSocketClient({
                url: `ws://localhost:8080/rsocket`,
                debug: true,
                wsCreator: (url) => {
                    return new WebSocket(url) as any;
                },
            }, BufferEncoders)
        });
    
        const requestStreamFlowable = (socket: ICureRSocket<unknown, string | Buffer | Uint8Array>, auth: Buffer) => {
            return new Flowable((subscriber) => {
                socket.requestStream({
                        data: Buffer.from('request-stream'),
                        metadata: encodeCompositeMetadata([
                            [MESSAGE_RSOCKET_ROUTING, encodeRoute('your-rsocket-route')],
                            [MESSAGE_RSOCKET_AUTHENTICATION, auth]
                        ])
                    }
                ).subscribe(subscriber)
            });
        }
    
        return new Promise(async (resolve, reject) => {
            const socket = new ICureRSocket(clientFactory, reject);
            await socket.connect(maxRetry, retryIntervalInMs)
    
            const request = requestStreamFlowable(socket, auth);
    
            request
                .lift(actual => new ResubscribeOperator(request, actual))
                .subscribe({
                    onSubscribe: (sub) => sub.request(2147483647),
                    onComplete: () => console.log(`Request-Stream Completed`),
                    onNext: (payload: any) => {
                        console.log(`Your next event is ${payload}`)
                    },
                    onError: error => {
                        console.log(`Request-Stream Error ${error}`)
                    }
                })
    
            resolve(socket)
        })
    }
    

    And find the complete implementation here : https://github.com/icure-io/icure-medical-device-js-sdk/blob/master/src/utils/rsocket.ts