websocketrustrust-warp

Forwarding messages between BusReader and warp WebSocket sink leaves unflushed buffer?


I'm trying to create a websocket server (and HTTP, hence using warp) that forwards messages from one source (an MQTT subscription) to many clients over websockets. This mostly seems to work fine, aside from the clients not receiving the first websocket message until the second message has been broadcast; then always staying one message behind until finally never receiving the last message. To me, the problem seems to be a send buffer that never fully flushes in the ws_connected function.

I use futures::stream::iter to turn the BusReader into a stream, then map the messages into the required Ok(Message) type that the WebSocket Sink requires. The official warp websocket chat example uses a similar construct for forwarding between streams: https://github.com/seanmonstar/warp/blob/42fd14fdab8145d27ae770fe4b5c843a99bc2a44/examples/websockets_chat.rs#L62.

In this pared-down example, the server broadcasts the values 0-9 over the bus. A websocat client (and JS websocket client in Firefox) receives the messages 0-8 -- albeit always one behind the broacast and server's stdout -- but 9 never arrives. The async_bus_print function receives all of the values on time, however, which proves that the messages are at least passing through the Bus with no problem.

Here is the server process's output:

async bus_print started
0
async bus: "0"
1
async bus: "1"
2
async bus: "2"
3
async bus: "3"
4
async bus: "4"
5
async bus: "5"
6
async bus: "6"
7
async bus: "7"
8
async bus: "8"
9
async bus: "9"

The code in question:

use std::{sync::{Arc, RwLock}, thread};

use bus::{Bus, BusReader};
use futures::StreamExt;
use warp::ws::{Message, WebSocket};
use warp::Filter;

async fn ws_connected(ws: WebSocket, rx: BusReader<String>) {
    let (ws_tx, _ws_rx) = ws.split();

    thread::spawn(|| {
        futures::executor::block_on(async move {
            if let Err(e) = futures::stream::iter(rx.into_iter())
                .map(|ws_msg| Ok(Message::text(ws_msg)))
                .forward(ws_tx)
                .await
            {
                eprintln!("Goodbye, websocket user: {}", e);
            }
        });
    });
}

async fn async_bus_print(mut rx: BusReader<String>) {
    println!("async bus_print started");
    thread::spawn(||
        futures::executor::block_on(async move {
            while let Some(msg) = futures::stream::iter(rx.iter()).next().await {
                println!("async bus: {:#?}", msg);
            }
        })
    );
}

async fn bus_tx(tx: Arc<RwLock<Bus<String>>>) {
    for i in 0..10u8 {
        tx.write().unwrap().broadcast(format!("{}", i));
        println!("{}", i);
        tokio::time::delay_for(std::time::Duration::from_secs(1)).await;
    }
}

#[tokio::main]
async fn main() {
    let bus = Arc::new(RwLock::new(Bus::new(20)));
    let bus2 = Arc::clone(&bus);
    let rx = warp::any().map(move || bus2.write().unwrap().add_rx());
    let rx2 = bus.write().unwrap().add_rx();

    let ws = warp::path("ws")
        .and(warp::ws())
        .and(rx)
        .map(|ws: warp::ws::Ws, rx| ws.on_upgrade(move |socket| ws_connected(socket, rx)));

    futures::join!(
        async_bus_print(rx2),
        bus_tx(bus),
        warp::serve(ws).run(([127, 0, 0, 1], 3030)),
    );
}

How can I track down and eliminate this "buffering" issue?

Hopefully I've explained it well enough. Please let me know if I can provide any more information. Thanks for any help.


Solution

  • While I still haven't been able to figure out the root cause of the unflushed data, thanks to some helpful people on reddit, I have some better, alternate solutions.

    It seems to work fine if you don't split the WebSocket in the first place:

    async fn ws_connected(ws: WebSocket, rx: BusReader<String>) {
        thread::spawn(|| {
            futures::executor::block_on(async move {
                if let Err(e) = futures::stream::iter(rx.into_iter())
                    .map(|ws_msg| Ok(Message::text(ws_msg)))
                    .forward(ws)
                    .await
                {
                    eprintln!("Goodbye, websocket user: {}", e);
                }
            });
        });
    }
    

    It's possible to still split the stream and send the data (instead of forward) to the tx side of the WebSocket if you include futures::SinkExt in the code:

    async fn ws_connected(ws: WebSocket, mut rx: BusReader<String>) {
        use futures::SinkExt;
    
        let (mut ws_tx, ws_rx) = ws.split();
    
        while let Ok(msg) = rx.recv() {
            ws_tx.send(Message::text(msg)).await.unwrap();
            // `send` automatically flushes the sink
        }
    }
    

    Ultimately, I think tokio's broadcast channel is a better fit for my async multi-producer, multi-consumer channel communication needs than bus:

    use futures::StreamExt;
    use tokio::sync::broadcast;
    use warp::ws::{Message, WebSocket};
    use warp::Filter;
    
    async fn ws_connected(ws: WebSocket, recv: broadcast::Receiver<String>) {
        let (ws_tx, _ws_rx) = ws.split();
    
        recv.map(|s| Ok(Message::text(s.unwrap())))
            .forward(ws_tx)
            .await
            .unwrap();
    }
    
    async fn async_bus_print(mut recv: broadcast::Receiver<String>) {
        println!("async bus_print started");
        while let Some(msg) = recv.next().await {
            println!("async bus: {:#?}", msg.unwrap());
        }
    }
    
    async fn bus_tx(tx: broadcast::Sender<String>) {
        for i in 0..10u8 {
            tx.send(format!("{}", i)).unwrap();
            println!("{}", i);
            tokio::time::delay_for(std::time::Duration::from_secs(1)).await;
        }
    }
    
    #[tokio::main]
    async fn main() {
        let (send, recv) = broadcast::channel::<String>(32);
        let send2 = send.clone();
        let rx = warp::any().map(move || send2.subscribe());
    
        let ws = warp::path("ws")
            .and(warp::ws())
            .and(rx)
            .map(|ws: warp::ws::Ws, recv| ws.on_upgrade(move |socket| ws_connected(socket, recv)));
    
        let (abp, tx, warp) = futures::join!(
            tokio::spawn(async_bus_print(recv)),
            tokio::spawn(bus_tx(send)),
            tokio::spawn(warp::serve(ws).run(([127, 0, 0, 1], 3030))),
        );
    
        abp.unwrap();
        tx.unwrap();
        warp.unwrap();
    }