I expect that all invocations of the server will be processed in parallel, but it is not true. Here is simple example.
RSocket version: 1.1.0
Server
public class ServerApp {
private static final Logger log = LoggerFactory.getLogger(ServerApp.class);
public static void main(String[] args) throws InterruptedException {
RSocketServer.create(SocketAcceptor.forRequestResponse(payload ->
Mono.fromCallable(() -> {
log.debug("Start of my business logic");
sleepSeconds(5);
return DefaultPayload.create("OK");
})))
.bind(WebsocketServerTransport.create(15000))
.block();
log.debug("Server started");
TimeUnit.MINUTES.sleep(30);
}
private static void sleepSeconds(int sec) {
try {
TimeUnit.SECONDS.sleep(sec);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Client
public class ClientApp {
private static final Logger log = LoggerFactory.getLogger(ClientApp.class);
public static void main(String[] args) throws InterruptedException {
RSocket client = RSocketConnector.create()
.connect(WebsocketClientTransport.create(15000))
.block();
long start1 = System.currentTimeMillis();
client.requestResponse(DefaultPayload.create("Request 1"))
.doOnNext(r -> log.debug("finished within {}ms", System.currentTimeMillis() - start1))
.subscribe();
long start2 = System.currentTimeMillis();
client.requestResponse(DefaultPayload.create("Request 2"))
.doOnNext(r -> log.debug("finished within {}ms", System.currentTimeMillis() - start2))
.subscribe();
TimeUnit.SECONDS.sleep(20);
}
}
In client logs, we can see that both request was sent at the same time, and both responses was received at the same time after 10sec (each request was proceed in 5 seconds).
In server logs, we can see that requests executed sequentially and not in parallel.
Could you please help me to understand this behavior?
If I replace Mono.fromCallable
by Mono.fromFuture(CompletableFuture.supplyAsync(() -> myBusinessLogic(), executorService))
, then it will resolve 1.
If I replace Mono.fromCallable
by Mono.delay(Duration.ZERO).map(ignore -> myBusinessLogic()
, then it will resolve 1. and 2.
If I replace Mono.fromCallable
by Mono.create(sink -> sink.success(myBusinessLogic()))
, then it will not resolve my issues.
Client logs:
2021-07-16 10:39:46,880 DEBUG [reactor-tcp-nio-1] [/] - sending ->
Frame => Stream ID: 0 Type: SETUP Flags: 0b0 Length: 56
Data:
2021-07-16 10:39:46,952 DEBUG [main] [/] - sending ->
Frame => Stream ID: 1 Type: REQUEST_RESPONSE Flags: 0b0 Length: 15
Data:
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 52 65 71 75 65 73 74 20 31 |Request 1 |
+--------+-------------------------------------------------+----------------+
2021-07-16 10:39:46,957 DEBUG [main] [/] - sending ->
Frame => Stream ID: 3 Type: REQUEST_RESPONSE Flags: 0b0 Length: 15
Data:
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 52 65 71 75 65 73 74 20 32 |Request 2 |
+--------+-------------------------------------------------+----------------+
2021-07-16 10:39:57,043 DEBUG [reactor-tcp-nio-1] [/] - receiving ->
Frame => Stream ID: 1 Type: NEXT_COMPLETE Flags: 0b1100000 Length: 8
Data:
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 4f 4b |OK |
+--------+-------------------------------------------------+----------------+
2021-07-16 10:39:57,046 DEBUG [reactor-tcp-nio-1] [/] - finished within 10120ms
2021-07-16 10:39:57,046 DEBUG [reactor-tcp-nio-1] [/] - receiving ->
Frame => Stream ID: 3 Type: NEXT_COMPLETE Flags: 0b1100000 Length: 8
Data:
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 4f 4b |OK |
+--------+-------------------------------------------------+----------------+
2021-07-16 10:39:57,046 DEBUG [reactor-tcp-nio-1] [/] - finished within 10094ms
Server Logs:
2021-07-16 10:39:46,965 DEBUG [reactor-http-nio-2] [/] - receiving ->
Frame => Stream ID: 0 Type: SETUP Flags: 0b0 Length: 56
Data:
2021-07-16 10:39:47,021 DEBUG [reactor-http-nio-2] [/] - receiving ->
Frame => Stream ID: 1 Type: REQUEST_RESPONSE Flags: 0b0 Length: 15
Data:
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 52 65 71 75 65 73 74 20 31 |Request 1 |
+--------+-------------------------------------------------+----------------+
2021-07-16 10:39:47,027 DEBUG [reactor-http-nio-2] [/] - Start of my business logic
2021-07-16 10:39:52,037 DEBUG [reactor-http-nio-2] [/] - sending ->
Frame => Stream ID: 1 Type: NEXT_COMPLETE Flags: 0b1100000 Length: 8
Data:
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 4f 4b |OK |
+--------+-------------------------------------------------+----------------+
2021-07-16 10:39:52,038 DEBUG [reactor-http-nio-2] [/] - receiving ->
Frame => Stream ID: 3 Type: REQUEST_RESPONSE Flags: 0b0 Length: 15
Data:
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 52 65 71 75 65 73 74 20 32 |Request 2 |
+--------+-------------------------------------------------+----------------+
2021-07-16 10:39:52,038 DEBUG [reactor-http-nio-2] [/] - Start of my business logic
2021-07-16 10:39:57,039 DEBUG [reactor-http-nio-2] [/] - sending ->
Frame => Stream ID: 3 Type: NEXT_COMPLETE Flags: 0b1100000 Length: 8
Data:
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 4f 4b |OK |
+--------+-------------------------------------------------+----------------+
You shouldn't mix asynchronous code like Reactive Mono operations with blocking code like
private static void sleepSeconds(int sec) {
try {
TimeUnit.SECONDS.sleep(sec);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
I suspect the central issue here is that a framework like rsocket-java doesn't want to run everything on new threads, at the cost of excessive context switching. So generally relies on you run long running CPU or IO operations appropriately.
You should look at the various async delay operations instead https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html#delayElement-java.time.Duration-
If your delay is meant to simulate a long running operation, then you should look at subscribing on a different scheduler like https://projectreactor.io/docs/core/release/api/reactor/core/scheduler/Schedulers.html#boundedElastic--