rustconcurrencyrust-actix

Multi-threading in async rust - why is my code failing to parallelize?


I'm trying to intentionally exhaust an API limit (900 calls) by running the following function:

#[get("/exhaust")]
pub async fn exhaust(_pool: web::Data<PgPool>, config: web::Data<Arc<Settings>>) -> impl Responder {
    let mut handles = vec![];

    for i in 1..900 {
        let inner_config = config.clone();
        let handle = thread::spawn(move || async move {
            println!("running thread {}", i);
            get_single_tweet(inner_config.as_ref().deref(), "1401287393228038149")
                .await
                .unwrap();
        });
        handles.push(handle);
    }

    for h in handles {
        h.join().unwrap().await;
    }

    HttpResponse::Ok()

My machine has 16 cores so I expected the above to run 16x faster than a single-threaded function, but it doesn't. In fact it runs exactly as slow as the single-threaded version.

Why is that? What am I missing?

Note: the move || async move part looks a little weird to me, but I got there by following suggestions from the compiler. It wouldn't let me put async next to the first move due to async closures being unstable. Could that be the issue?


Solution

  • This code will indeed run your async blocks synchronously. An async block creates a type that implements Future, but one thing to know is that Futures don't start running on their own, they have to either be await-ed or given to an executor to run.

    Calling thread::spawn with a closure that returns a Future as you've done will not execute them; the threads are simply creating the async block and returning. So the async blocks aren't actually being executed until you await them in the loop over handles, which will process the futures in order.

    One way to fix this is to use join_all from the futures crate to run them all simultaneously.

    let mut futs = vec![];
    
    for i in 1..900 {
        let inner_config = config.clone();
        futs.push(async move {
            println!("running thread {}", i);
            get_single_tweet(inner_config.as_ref().deref(), "1401287393228038149")
                .await
                .unwrap();
        });
    }
    
    futures::future::join_all(futs).await;