I have two approaches to the same function, and am trying to see which is more idiomatic or performant.
The goal of the query_many_points()
function is to take in two same-size arrays and concurrently send them to an API via the query_point()
function, returning a vector of Response
s - a custom struct containing data deserialized from the API. The order of the API response must matchy exactly to the order of the inputs. I believe both approaches achieve this.
I originally wrote this via collecting the futures in a Vec, then iterating to await
and unwrap the results in order.
Approach 1: Vec of handles
fn query_many_points(xs: &[f64], ys: &[f64]) -> anyhow::Result<Vec<Response>> {
let agent = ureq::agent();
let semaphore = Arc::new(Semaphore::new(TASKS_LIMIT));
// Concurrently query all given coordinates, limited to TASKS_LIMIT concurrent tasks
// Returned Vec<Response> preserves order of input coordinates
let runtime = Runtime::new()?;
runtime.block_on(async {
// Create a vector of task handles, preserving order with input coordinates
let handles = xs
.iter()
.zip(ys.iter())
.map(|(&x, &y)| {
// Obtain permit from semaphore when available
let permit = semaphore.clone().acquire_owned();
let agent = agent.clone();
// Spawn new task and query point, returning handle containing Result<Response>
tokio::spawn(async move {
let result = query_point(x, y, &agent);
drop(permit);
result
})
})
.collect::<Vec<_>>(); // this line is where the approaches start to differ
// Await all tasks to complete in order and collect into Vec<Response>
let mut responses = Vec::with_capacity(handles.len());
for handle in handles {
responses.push(handle.await??);
}
Ok(responses)
})
}
I then found out about futures::stream::FuturesOrdered
and gave it a whirl, coming up with a second solution.
Approach 2: FuturesOrdered
fn query_many_points(xs: &[f64], ys: &[f64]) -> anyhow::Result<Vec<Response>> {
let agent = ureq::agent();
let semaphore = Arc::new(Semaphore::new(TASKS_LIMIT));
// Concurrently query all given points, limited to TASKS_LIMIT concurrent tasks
// Returned Vec<Response> preserves order of input points
let runtime = Runtime::new()?;
runtime.block_on(async {
// Create a FuturesOrdered of task handles
let handles = xs
.iter()
.zip(ys.iter())
.map(|(&x, &y)| {
// Obtain permit from semaphore when available
let permit = semaphore.clone().acquire_owned();
let agent = agent.clone();
// Spawn new task and query point, returning handle containing Result<Response>
tokio::spawn(async move {
let result = query_point(x, y, &agent);
drop(permit);
result
})
})
.collect::<FuturesOrdered<_>>(); // this line is where the approaches start to differ
// Await completion of all tasks in order and collect into Vec<Response>
handles.try_collect::<Vec<_>>().await?.into_iter().collect()
})
}
Which approach is best? Are there any other improvements that could be made? Both preserve order exactly the same, right?
Consider the case where you don't call tokio::spawn
and just return the future itself from map
.
If you iterate through a Vec
of futures and call await
on each one, only one future will start at a time, and the next future will only start when the previous one has finished.
If you use a FuturesOrdered
, it will start multiple futures at a time, potentially allowing for more concurrency, as one started future can make progress while another one has to wait.
In your specific example, it won't matter - you spawn a task for each future, tokio will begin executing them on its own (potentially on another thread), and the futures your joining on don't do any work beyond "wait until the task is done". You may as well just use the vec.