rustlifetimerust-tokiochunks

Lifetime error while trying to use chunks of a vector with the Tokio spawn function for multi-threading


So, I have this code which is running fine in a single thread

pub async fn insert_foos(
    self,
    foos: Vec<Foo>,
) -> Result<(), Box<dyn std::error::Error>> {
    let chunks = foos.chunks(10_000);

    for chunk in chunks {
        let _ = &self.insert_foo_chunk(chunk).await?;
    }

    Ok(())
}

I then tried to make it multi-threaded:

pub async fn insert_foos(
    self,
    foos: Vec<Foo>,
) -> Result<(), Box<dyn std::error::Error>> {
    let chunks = foos.chunks(10_000);

    let tasks: Vec<_> = chunks
        .map(move|chunk| {
            let pg_repository = self.clone();
            tokio::spawn(async move {
                pg_repository.insert_foo_chunk(chunk).await?;
                Ok::<(), Error>(())
            })
        })
        .collect();

    for task in tasks {
        let _ = task.await?;
    }

    Ok(())
}

But then I get this error:

error[E0597]: `foos` does not live long enough
  --> src/db/postgres_repository.rs:18:22
   |
16 |         foos: Vec<Foo>,
   |         ---- binding `foos` declared here
17 |     ) -> Result<(), Box<dyn std::error::Error>> {
18 |         let chunks = foos.chunks(10_000);
   |                      ^^^^---------------
   |                      |
   |                      borrowed value does not live long enough
   |                      argument requires that `foos` is borrowed for `'static`
...
40 |     }
   |     - `foos` dropped here while still borrowed

Checking the Chunks::new method's implementation

impl<'a, T: 'a> Chunks<'a, T> {
    #[inline]
    pub(super) fn new(slice: &'a [T], size: usize) -> Self {
        Self { v: slice, chunk_size: size }
    }
}

It's very simple and shows that v is actually my foos with the same lifetime. So I think I understand because of the async move, my chunks is moved out of the function but actually my foos is supposed to live until the end of the function which obviously creates this lifetime issue. Am I right?

But I've been scratching my head since yesterday now and can't think of how to solve this issue. I don't want to have to clone foos because this vector contains around 500 MB of data and actually the call to insert_foos is already multi-threaded (which makes me load around 6 GB of data in total, not enough RAM to clone all of this).

I don't care at all of foos after I inserted the chunks so I actually don't need this variable to live until the end of the function. I just don't know how to tell this to the compiler...

Thanks a lot for your help.


Solution

  • tokio::spawn only accepts 'static Future's, not futures that borrow from local variables like chunks. That's because it's very flexible and would allow the spawned futures to last long after your variables go out of scope.

    You can use futures::future::try_join_all instead:

    struct Foo;
    type Error = std::io::Error;
    #[derive(Clone)]
    struct Repo;
    
    impl Repo {
        pub async fn insert_foo_chunk(&self, _chunk: &[Foo]) -> Result<(), Error> {
            Ok(())
        }
    
        pub async fn insert_foos(
            self,
            foos: Vec<Foo>,
        ) -> Result<(), Error> {
            let chunks = foos.chunks(10_000);
            
            futures::future::try_join_all(
                chunks
                    .map(move |chunk| {
                        let pg_repository = self.clone();
                        async move {
                            pg_repository.insert_foo_chunk(chunk).await
                        }
                    })
            ).await.map(|_| ())
        }
    }
    

    Note how the error type can be simpler now that spawn-related errors are impossible. Also note that the above will return the first error encountered. If you want all tasks to complete regardless of errors, like your original code, use futures::future::join_all instead.