I wrote simple chat service on Spring WebFlux (Reactor). WebSocketHandler part looks like
public class ChatWebSocketHandler implements WebSocketHandler {
private final Flux<String> outputEvents;
private final Sinks.Many<Message> eventPublisher = Sinks.many().unicast().onBackpressureBuffer();
public ChatWebSocketHandler(){
this.outputEvents = Flux.from(messages)
.replay(10)
.autoConnect();
}
@Override
public Mono<Void> handle(final WebSocketSession session) {
return session.receive()
.log()
.map(WebSocketMessage::getPayloadAsText)
.filter(msg -> messageValidationService.isMessageValid(msg.getText()))
.doOnNext(msg -> eventPublisher.tryEmitNext(msg))
.zipWith(session.send(outputEvents.map(session::textMessage)))
.then();
}
}
This is works fine only for first 32 messages, messages are incomes and goes back as expected, but after 32 messages, server are stop reacting. No any logs (even on debug level) after 32th message. If client do reconnect it can sent another 32 messages.
Log of .log() looks like
2023-10-04T14:33:37.317+03:00 INFO 15812 --- [ctor-http-nio-2] reactor.Flux.Peek.1 : onSubscribe(FluxPeek.PeekSubscriber)
2023-10-04T14:33:37.320+03:00 INFO 15812 --- [ctor-http-nio-2] reactor.Flux.Peek.1 : request(32)
2023-10-04T14:33:37.383+03:00 INFO 15812 --- [ctor-http-nio-2] reactor.Flux.Peek.1 : onNext("text")
2023-10-04T14:33:37.383+03:00 INFO 15812 --- [ctor-http-nio-2] reactor.Flux.Peek.1 : onNext("text")
2023-10-04T14:33:37.383+03:00 INFO 15812 --- [ctor-http-nio-2] reactor.Flux.Peek.1 : onNext("text")
... another 29 same lines
Where is set that limit? Could I increase it?
I guess I found the solution. I changed prefetch from request(32) to request(unbound) as:
return session.receive()
.log()
.map(WebSocketMessage::getPayloadAsText)
.filter(msg -> messageValidationService.isMessageValid(msg.getText()))
.doOnNext(msg -> eventPublisher.tryEmitNext(msg))
.limitRate(Integer.MAX_VALUE)
.zipWith(session.send(outputEvents.map(session::textMessage)))
.then();
It's helped.