rustconcurrencyrust-tokiorust-futures

When to use FuturesOrdered over a Vec of futures?


I have two approaches to the same function, and am trying to see which is more idiomatic or performant.

The goal of the query_many_points() function is to take in two same-size arrays and concurrently send them to an API via the query_point() function, returning a vector of Responses - a custom struct containing data deserialized from the API. The order of the API response must matchy exactly to the order of the inputs. I believe both approaches achieve this.

I originally wrote this via collecting the futures in a Vec, then iterating to await and unwrap the results in order.

Approach 1: Vec of handles

fn query_many_points(xs: &[f64], ys: &[f64]) -> anyhow::Result<Vec<Response>> {
    let agent = ureq::agent();
    let semaphore = Arc::new(Semaphore::new(TASKS_LIMIT));

    // Concurrently query all given coordinates, limited to TASKS_LIMIT concurrent tasks
    // Returned Vec<Response> preserves order of input coordinates
    let runtime = Runtime::new()?;
    runtime.block_on(async {
        // Create a vector of task handles, preserving order with input coordinates
        let handles = xs
            .iter()
            .zip(ys.iter())
            .map(|(&x, &y)| {
                // Obtain permit from semaphore when available
                let permit = semaphore.clone().acquire_owned();
                let agent = agent.clone();
                // Spawn new task and query point, returning handle containing Result<Response>
                tokio::spawn(async move {
                    let result = query_point(x, y, &agent);
                    drop(permit);
                    result
                })
            })
            .collect::<Vec<_>>(); // this line is where the approaches start to differ

        // Await all tasks to complete in order and collect into Vec<Response>
        let mut responses = Vec::with_capacity(handles.len());
        for handle in handles {
            responses.push(handle.await??);
        }

        Ok(responses)
    })
}

I then found out about futures::stream::FuturesOrdered and gave it a whirl, coming up with a second solution.

Approach 2: FuturesOrdered

fn query_many_points(xs: &[f64], ys: &[f64]) -> anyhow::Result<Vec<Response>> {
    let agent = ureq::agent();
    let semaphore = Arc::new(Semaphore::new(TASKS_LIMIT));

    // Concurrently query all given points, limited to TASKS_LIMIT concurrent tasks
    // Returned Vec<Response> preserves order of input points
    let runtime = Runtime::new()?;
    runtime.block_on(async {
        // Create a FuturesOrdered of task handles
        let handles = xs
            .iter()
            .zip(ys.iter())
            .map(|(&x, &y)| {
                // Obtain permit from semaphore when available
                let permit = semaphore.clone().acquire_owned();
                let agent = agent.clone();
                // Spawn new task and query point, returning handle containing Result<Response>
                tokio::spawn(async move {
                    let result = query_point(x, y, &agent);
                    drop(permit);
                    result
                })
            })
            .collect::<FuturesOrdered<_>>(); // this line is where the approaches start to differ

        // Await completion of all tasks in order and collect into Vec<Response>
        handles.try_collect::<Vec<_>>().await?.into_iter().collect()
    })
}

Which approach is best? Are there any other improvements that could be made? Both preserve order exactly the same, right?


Solution

  • Consider the case where you don't call tokio::spawn and just return the future itself from map.

    If you iterate through a Vec of futures and call await on each one, only one future will start at a time, and the next future will only start when the previous one has finished.

    If you use a FuturesOrdered, it will start multiple futures at a time, potentially allowing for more concurrency, as one started future can make progress while another one has to wait.


    In your specific example, it won't matter - you spawn a task for each future, tokio will begin executing them on its own (potentially on another thread), and the futures your joining on don't do any work beyond "wait until the task is done". You may as well just use the vec.