javaspringwebsocketspring-webfluxproject-reactor

How to increase limit of messages for WebSocketSession


I wrote simple chat service on Spring WebFlux (Reactor). WebSocketHandler part looks like

public class ChatWebSocketHandler implements WebSocketHandler {
    private final Flux<String> outputEvents;
    private final Sinks.Many<Message> eventPublisher = Sinks.many().unicast().onBackpressureBuffer();

    public ChatWebSocketHandler(){
        this.outputEvents = Flux.from(messages)
                .replay(10)
                .autoConnect();
    }

    @Override
    public Mono<Void> handle(final WebSocketSession session) {
        return session.receive()
                .log()
                .map(WebSocketMessage::getPayloadAsText)
                .filter(msg -> messageValidationService.isMessageValid(msg.getText()))
                .doOnNext(msg -> eventPublisher.tryEmitNext(msg))
                .zipWith(session.send(outputEvents.map(session::textMessage)))
                .then();
    }
}

This is works fine only for first 32 messages, messages are incomes and goes back as expected, but after 32 messages, server are stop reacting. No any logs (even on debug level) after 32th message. If client do reconnect it can sent another 32 messages.

Log of .log() looks like

2023-10-04T14:33:37.317+03:00  INFO 15812 --- [ctor-http-nio-2] reactor.Flux.Peek.1                      : onSubscribe(FluxPeek.PeekSubscriber)
2023-10-04T14:33:37.320+03:00  INFO 15812 --- [ctor-http-nio-2] reactor.Flux.Peek.1                      : request(32)
2023-10-04T14:33:37.383+03:00  INFO 15812 --- [ctor-http-nio-2] reactor.Flux.Peek.1                      : onNext("text")
2023-10-04T14:33:37.383+03:00  INFO 15812 --- [ctor-http-nio-2] reactor.Flux.Peek.1                      : onNext("text")
2023-10-04T14:33:37.383+03:00  INFO 15812 --- [ctor-http-nio-2] reactor.Flux.Peek.1                      : onNext("text")
... another 29 same lines

Where is set that limit? Could I increase it?


Solution

  • I guess I found the solution. I changed prefetch from request(32) to request(unbound) as:

     return session.receive()
                    .log()
                    .map(WebSocketMessage::getPayloadAsText)
                    .filter(msg -> messageValidationService.isMessageValid(msg.getText()))
                    .doOnNext(msg -> eventPublisher.tryEmitNext(msg))
                    .limitRate(Integer.MAX_VALUE)
                    .zipWith(session.send(outputEvents.map(session::textMessage)))
                    .then();
    

    It's helped.