So, I have this code which is running fine in a single thread
pub async fn insert_foos(
self,
foos: Vec<Foo>,
) -> Result<(), Box<dyn std::error::Error>> {
let chunks = foos.chunks(10_000);
for chunk in chunks {
let _ = &self.insert_foo_chunk(chunk).await?;
}
Ok(())
}
I then tried to make it multi-threaded:
pub async fn insert_foos(
self,
foos: Vec<Foo>,
) -> Result<(), Box<dyn std::error::Error>> {
let chunks = foos.chunks(10_000);
let tasks: Vec<_> = chunks
.map(move|chunk| {
let pg_repository = self.clone();
tokio::spawn(async move {
pg_repository.insert_foo_chunk(chunk).await?;
Ok::<(), Error>(())
})
})
.collect();
for task in tasks {
let _ = task.await?;
}
Ok(())
}
But then I get this error:
error[E0597]: `foos` does not live long enough
--> src/db/postgres_repository.rs:18:22
|
16 | foos: Vec<Foo>,
| ---- binding `foos` declared here
17 | ) -> Result<(), Box<dyn std::error::Error>> {
18 | let chunks = foos.chunks(10_000);
| ^^^^---------------
| |
| borrowed value does not live long enough
| argument requires that `foos` is borrowed for `'static`
...
40 | }
| - `foos` dropped here while still borrowed
Checking the Chunks::new
method's implementation
impl<'a, T: 'a> Chunks<'a, T> {
#[inline]
pub(super) fn new(slice: &'a [T], size: usize) -> Self {
Self { v: slice, chunk_size: size }
}
}
It's very simple and shows that v
is actually my foos
with the same lifetime.
So I think I understand because of the async move
, my chunks
is moved out of the function but actually my foos
is supposed to live until the end of the function which obviously creates this lifetime issue. Am I right?
But I've been scratching my head since yesterday now and can't think of how to solve this issue. I don't want to have to clone foos
because this vector contains around 500 MB of data and actually the call to insert_foos
is already multi-threaded (which makes me load around 6 GB of data in total, not enough RAM to clone all of this).
I don't care at all of foos
after I inserted the chunks so I actually don't need this variable to live until the end of the function. I just don't know how to tell this to the compiler...
Thanks a lot for your help.
tokio::spawn
only accepts 'static
Future
's, not futures that borrow from local variables like chunks
. That's because it's very flexible and would allow the spawned futures to last long after your variables go out of scope.
You can use futures::future::try_join_all
instead:
struct Foo;
type Error = std::io::Error;
#[derive(Clone)]
struct Repo;
impl Repo {
pub async fn insert_foo_chunk(&self, _chunk: &[Foo]) -> Result<(), Error> {
Ok(())
}
pub async fn insert_foos(
self,
foos: Vec<Foo>,
) -> Result<(), Error> {
let chunks = foos.chunks(10_000);
futures::future::try_join_all(
chunks
.map(move |chunk| {
let pg_repository = self.clone();
async move {
pg_repository.insert_foo_chunk(chunk).await
}
})
).await.map(|_| ())
}
}
Note how the error type can be simpler now that spawn
-related errors are impossible. Also note that the above will return the first error encountered. If you want all tasks to complete regardless of errors, like your original code, use futures::future::join_all
instead.