javaproject-reactorrsocketrsocket-java

Parallel requests from one client processed in series in RSocket


I expect that all invocations of the server will be processed in parallel, but it is not true. Here is simple example.

RSocket version: 1.1.0

Server

public class ServerApp {
    private static final Logger log = LoggerFactory.getLogger(ServerApp.class);

    public static void main(String[] args) throws InterruptedException {
        RSocketServer.create(SocketAcceptor.forRequestResponse(payload ->
                Mono.fromCallable(() -> {
                    log.debug("Start of my business logic");
                    sleepSeconds(5);
                    return DefaultPayload.create("OK");
                })))
                .bind(WebsocketServerTransport.create(15000))
                .block();
        log.debug("Server started");
        TimeUnit.MINUTES.sleep(30);
    }

    private static void sleepSeconds(int sec) {
        try {
            TimeUnit.SECONDS.sleep(sec);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

Client

public class ClientApp {
    private static final Logger log = LoggerFactory.getLogger(ClientApp.class);

    public static void main(String[] args) throws InterruptedException {
        RSocket client = RSocketConnector.create()
                .connect(WebsocketClientTransport.create(15000))
                .block();

        long start1 = System.currentTimeMillis();
        client.requestResponse(DefaultPayload.create("Request 1"))
                .doOnNext(r -> log.debug("finished within {}ms", System.currentTimeMillis() - start1))
                .subscribe();

        long start2 = System.currentTimeMillis();
        client.requestResponse(DefaultPayload.create("Request 2"))
                .doOnNext(r -> log.debug("finished within {}ms", System.currentTimeMillis() - start2))
                .subscribe();

        TimeUnit.SECONDS.sleep(20);
    }
}

In client logs, we can see that both request was sent at the same time, and both responses was received at the same time after 10sec (each request was proceed in 5 seconds).

In server logs, we can see that requests executed sequentially and not in parallel.

Could you please help me to understand this behavior?

  1. Why we have received the first response after 10 seconds and not 5?
  2. How do I create the server correctly if I want all requests to be processed in parallel?

If I replace Mono.fromCallable by Mono.fromFuture(CompletableFuture.supplyAsync(() -> myBusinessLogic(), executorService)), then it will resolve 1.

If I replace Mono.fromCallable by Mono.delay(Duration.ZERO).map(ignore -> myBusinessLogic(), then it will resolve 1. and 2.

If I replace Mono.fromCallable by Mono.create(sink -> sink.success(myBusinessLogic())), then it will not resolve my issues.

Client logs:

2021-07-16 10:39:46,880 DEBUG [reactor-tcp-nio-1] [/] - sending -> 
Frame => Stream ID: 0 Type: SETUP Flags: 0b0 Length: 56
Data:

2021-07-16 10:39:46,952 DEBUG [main] [/] - sending -> 
Frame => Stream ID: 1 Type: REQUEST_RESPONSE Flags: 0b0 Length: 15
Data:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 52 65 71 75 65 73 74 20 31                      |Request 1       |
+--------+-------------------------------------------------+----------------+
2021-07-16 10:39:46,957 DEBUG [main] [/] - sending -> 
Frame => Stream ID: 3 Type: REQUEST_RESPONSE Flags: 0b0 Length: 15
Data:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 52 65 71 75 65 73 74 20 32                      |Request 2       |
+--------+-------------------------------------------------+----------------+
2021-07-16 10:39:57,043 DEBUG [reactor-tcp-nio-1] [/] - receiving -> 
Frame => Stream ID: 1 Type: NEXT_COMPLETE Flags: 0b1100000 Length: 8
Data:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 4f 4b                                           |OK              |
+--------+-------------------------------------------------+----------------+
2021-07-16 10:39:57,046 DEBUG [reactor-tcp-nio-1] [/] - finished within 10120ms
2021-07-16 10:39:57,046 DEBUG [reactor-tcp-nio-1] [/] - receiving -> 
Frame => Stream ID: 3 Type: NEXT_COMPLETE Flags: 0b1100000 Length: 8
Data:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 4f 4b                                           |OK              |
+--------+-------------------------------------------------+----------------+
2021-07-16 10:39:57,046 DEBUG [reactor-tcp-nio-1] [/] - finished within 10094ms

Server Logs:

2021-07-16 10:39:46,965 DEBUG [reactor-http-nio-2] [/] - receiving -> 
Frame => Stream ID: 0 Type: SETUP Flags: 0b0 Length: 56
Data:

2021-07-16 10:39:47,021 DEBUG [reactor-http-nio-2] [/] - receiving -> 
Frame => Stream ID: 1 Type: REQUEST_RESPONSE Flags: 0b0 Length: 15
Data:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 52 65 71 75 65 73 74 20 31                      |Request 1       |
+--------+-------------------------------------------------+----------------+
2021-07-16 10:39:47,027 DEBUG [reactor-http-nio-2] [/] - Start of my business logic
2021-07-16 10:39:52,037 DEBUG [reactor-http-nio-2] [/] - sending -> 
Frame => Stream ID: 1 Type: NEXT_COMPLETE Flags: 0b1100000 Length: 8
Data:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 4f 4b                                           |OK              |
+--------+-------------------------------------------------+----------------+
2021-07-16 10:39:52,038 DEBUG [reactor-http-nio-2] [/] - receiving -> 
Frame => Stream ID: 3 Type: REQUEST_RESPONSE Flags: 0b0 Length: 15
Data:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 52 65 71 75 65 73 74 20 32                      |Request 2       |
+--------+-------------------------------------------------+----------------+
2021-07-16 10:39:52,038 DEBUG [reactor-http-nio-2] [/] - Start of my business logic
2021-07-16 10:39:57,039 DEBUG [reactor-http-nio-2] [/] - sending -> 
Frame => Stream ID: 3 Type: NEXT_COMPLETE Flags: 0b1100000 Length: 8
Data:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 4f 4b                                           |OK              |
+--------+-------------------------------------------------+----------------+

Solution

  • You shouldn't mix asynchronous code like Reactive Mono operations with blocking code like

        private static void sleepSeconds(int sec) {
            try {
                TimeUnit.SECONDS.sleep(sec);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    

    I suspect the central issue here is that a framework like rsocket-java doesn't want to run everything on new threads, at the cost of excessive context switching. So generally relies on you run long running CPU or IO operations appropriately.

    You should look at the various async delay operations instead https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html#delayElement-java.time.Duration-

    If your delay is meant to simulate a long running operation, then you should look at subscribing on a different scheduler like https://projectreactor.io/docs/core/release/api/reactor/core/scheduler/Schedulers.html#boundedElastic--