spring-websocketspring-webflux

How to use Spring Reactive WebSocket and transform it into the Flux stream?


There is some WebSocketClient example on Spring documentation:

WebSocketClient client = new ReactorNettyWebSocketClient();
client.execute("ws://localhost:8080/echo"), session -> {...}).blockMillis(5000);

Im not sure how to handle stream of incomming data? Inside that block {...}.

I mean: how can I filter incoming data and cast it into Flux?

Here is what I want to get.

@GetMapping("/stream", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
    public Flux<MyRecourse> getStreaming() {

    //  get some data from WebSocket (CoinCap service).
    //  Transform that data into MyRecourse object
    //  Return stream to a client 

}

Solution

  • Just take a look into that WebSocketSession param of the WebSocketHandler.handle() lambda:

    /**
     * Get the flux of incoming messages.
     */
    Flux<WebSocketMessage> receive();
    

    See Spring WebFlux Workshop for more information.

    UPDATE

    Let's try this!

        Mono<Void> sessionMono =
                client.execute(new URI("ws://localhost:8080/echo"),
                        session ->
                                Mono.empty()
                                        .subscriberContext(Context.of(WebSocketSession.class, session))
                                        .then());
    
        return sessionMono
                .thenMany(
                        Mono.subscriberContext()
                                .flatMapMany(c -> c
                                        .get(WebSocketSession.class)
                                        .receive()))
                .map(WebSocketMessage::getPayloadAsText);
    

    UPDATE 2

    Or another option but with blocked subscription:

        EmitterProcessor<String> output = EmitterProcessor.create();
    
        client.execute(new URI("ws://localhost:8080/echo"),
                session ->
                        session.receive()
                                .map(WebSocketMessage::getPayloadAsText)
                                .subscribeWith(output)
                                .then())
                .block(Duration.ofMillis(5000));
    
        return output;
    

    UPDATE 3

    The working Spring Boot application on the matter: https://github.com/artembilan/sandbox/tree/master/websocket-over-webflux

    The main code is like:

        EmitterProcessor<String> output = EmitterProcessor.create();
    
        Mono<Void> sessionMono =
                client.execute(new URI("ws://localhost:8080/echo"),
                        session -> session.receive()
                                .map(WebSocketMessage::getPayloadAsText)
                                .subscribeWith(output)
                                .then());
    
        return output.doOnSubscribe(s -> sessionMono.subscribe());