I have this code logic which sends a message from one server to another server using R-Socket and the entire code is written using reactor (Flux/Mono). Sometimes I notice some of the messages are not getting delivered but when tried to debug I find logs are missing after a certain step
logger.info("starting publish to other servers");
Mono<List<Map.Entry<String, GroupedFlux<String,OtherServers>>>> ipsOfOtherServer = otherServers
.filter(//some filter condition//)
.groupBy(OtherServers::getServerIp)
.collectMap(GroupedFlux::key)
.flatMapMany(map -> Flux.fromIterable(map.entrySet()))
.collectList()
.doOnSuccess(list -> {
if(list.isEmpty()) {
logger.info("list is empty");
}
});
ipsOfOtherServer
.flatMapMany(Flux::fromIterable)
.concatMap(//logic to send message to other server//)
.block()
While de-bugging I notice the first log but after that nothing.If the list is empty I am expecting the second log but that is also not appearing. Any idea what is happening or how to debug.
The Reactor chain might not be executing as expected, potentially due to an issue with asynchronous behavior or how the block() method is used.
Add detailed logging at each stage and determine where the execution halts and why the logs after the first one are missing.
For example, add .doOnError()
to capture and log any errors. Similarly, add .doOnSubscribe()
, .doOnNext()
, and .doOnComplete()
to monitor the flow of execution. If the otherServers.filter()
produces an empty result before reaching groupBy
, the steps (including collectMap
, flatMapMany
, and the doOnSuccess
) might not be executed.
Update code:
logger.info("Starting publish to other servers");
Mono<List<Map.Entry<String, GroupedFlux<String, OtherServers>>>> ipsOfOtherServer = otherServers
.filter(// some filter condition //)
.doOnNext(server -> logger.info("Filtered server: " + server))
.groupBy(OtherServers::getServerIp)
.collectMap(GroupedFlux::key)
.flatMapMany(map -> Flux.fromIterable(map.entrySet()))
.collectList()
.doOnSuccess(list -> {
if (list.isEmpty()) {
logger.info("List is empty after grouping by server IP");
} else {
logger.info("Grouped servers: " + list);
}
})
.doOnError(error -> logger.error("Error occurred while collecting server IPs: ", error));
ipsOfOtherServer
.flatMapMany(Flux::fromIterable)
.concatMap(// logic to send message to other server //)
.doOnComplete(() -> logger.info("Completed sending messages to all servers"))
.doOnError(error -> logger.error("Error occurred during message sending: ", error))
.block();