rustrust-tokiorust-warp

Loop through arc and pop values in different thread


I'm trying to implement a shared state (arc) for a Warp route. Given this main function:

#[tokio::main]
async fn main() {
    let ack_vec: Vec<api::Acknowledgement> = Vec::new();
    let arc = Arc::new(Mutex::new(ack_vec));

    // GET /call/:id
    let call = warp::get()
        .and(warp::path("call"))
        .and(warp::path::param())
        .map({
            let queue = arc.clone(); // clone handle outside move

            move |id: u32| {
                let ack = api::Acknowledgement::new(id);
                let mut queue = queue.lock().unwrap();
                queue.push(ack);
                println!("pushed to queue: {:?}", queue);

                Ok(warp::reply::json(&id))
            }
        });

    let routes = call
        .with(warp::trace::request());

    warp::serve(routes).run(([127, 0, 0, 1], 4000)).await;

    // problem starts here
    loop {
        let queue = arc.clone(); // clone the handle
        let mut queue = queue.lock().unwrap();

        println!("popped from queue: {:?}", queue.pop());
    }
}

Right now, if I hit the /call route I can see the Acknowledgement is added to the queue.

The loop at the end is supposed to pop each value from the queue as they're added. Right now it does nothing.

I've tried wrapping it in a tokio::spawn and still no result (nothing is printed to stdout).

What am I doing wrong?


Solution

  • Thanks to @Stargateur's comment I realised what I was doing wrong. Here's a working prototype should anyone else get stuck at a similar place.

    std::thread::spawn(move || {
        loop {
            let two = std::time::Duration::from_secs(2);
            std::thread::sleep(two);
    
            let queue = arc.clone(); // clone handle outside move
            let mut queue = queue.lock().unwrap();
            println!("queue: {:?}", queue);
    
            while let Some(ack) = queue.pop() {
                println!("ack popped: {:?}", ack);
            }
        }
    });
    
    warp::serve(routes).run(([127, 0, 0, 1], 4000)).await;