project-reactorspring-webfluxsocket.io-java-client

Reactor Flux proxy for Socket.IO-client Java


I'm implementing Spring WebFlux end-point that should get data from Socket.IO-client Java.

I don't understand the way to collect incoming data into Flux stream. Can I create new Flux some how and subscribe it to that incoming data? Thanks for advice.

@GetMapping("/streaming", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
    public Flux<MyRecourse> getStreaming() {

    URI uri = URI.create("http://localhost/socket.io"); // client
    Socket socket = IO.socket(uri);

    socket.on("event", args -> {    
        JSONObject obj = (JSONObject)args[0]; 
        MyRecourse recource = MyRecourse.create(obj);

        // how to put this recource into Flux stream?
    });

    return fluxStreamOfRecources;

}

Solution

  • You can use Flux.create() to generate a Flux from an event listener.

    Flux.<MyResource>create(emitter -> {
    
         URI uri = URI.create("http://localhost/socket.io"); // client
         Socket socket = IO.socket(uri);
    
         socket.on("event", args -> {    
           JSONObject obj = (JSONObject)args[0]; 
           MyResource resource = MyResource.create(obj);
           emitter.next(resource);
         });
    
         // subscribe on error events
         socket.on(Socket.EVENT_CONNECT_ERROR, args -> {    
           // get error
           emitter.error(throwable);
         });
    
         // unsubscribe from events when the client cancels
         emitter.onDispose(() -> {
             // disconnect from socket
             // socket.off(...)
         });
     });