asynchronousrustfutureself-reference

Storing Future next to source of said Future


I have a type which semantically behaves like a stream but doesn't actually implement the futures_util::Stream trait. Instead, it has a method get_next which repeatedly returns a Future yielding the next item.

I'm trying to implement an adapter around this in order to use it as a stream but I'm running into some trouble. I created the following tow example:

use futures_util::Stream;
use std::{future::Future, pin::Pin, task::Poll};

struct FutureSource {/* omitted */}

impl FutureSource {
    pub fn get_next(&mut self) -> FutureString {
        FutureString {
            source: self,
            important_state: todo!(),
        }
    }
}

struct FutureString<'a> {
    source: &'a mut FutureSource,
    important_state: String,
}

impl<'a> Future for FutureString<'a> {
    type Output = String;

    fn poll(self: Pin<&mut Self>, _cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
        unimplemented!()
    }
}

struct StreamAdapter<'a> {
    future_source: &'a mut FutureSource,
    future: Option<FutureString<'a>>,
}

impl<'a> Stream for StreamAdapter<'a> {
    type Item = <FutureString<'a> as Future>::Output;

    fn poll_next(
        mut self: Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> Poll<Option<Self::Item>> {
        if self.future.is_none() {
            self.future.insert(self.future_source.get_next());
        }
        let fut = self.future.as_mut().unwrap();

        match Pin::new(fut).poll(cx) {
            Poll::Ready(value) => {
                self.future = None;
                Poll::Ready(Some(value))
            }
            Poll::Pending => Poll::Pending,
        }
    }
}

This does not compile and gives the following errors:

error[E0499]: cannot borrow `self` as mutable more than once at a time
  --> src/main.rs:41:32
   |
41 |             self.future.insert(self.future_source.get_next());
   |             ----        ------ ^^^^ second mutable borrow occurs here
   |             |           |
   |             |           first borrow later used by call
   |             first mutable borrow occurs here

error: lifetime may not live long enough
  --> src/main.rs:41:32
   |
33 | impl<'a> Stream for StreamAdapter<'a> {
   |      -- lifetime `'a` defined here
...
37 |         mut self: Pin<&mut Self>,
   |                       - let's call the lifetime of this reference `'1`
...
41 |             self.future.insert(self.future_source.get_next());
   |                                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ argument requires that `'1` must outlive `'a`

error[E0499]: cannot borrow `self` as mutable more than once at a time
  --> src/main.rs:43:19
   |
33 | impl<'a> Stream for StreamAdapter<'a> {
   |      -- lifetime `'a` defined here
...
41 |             self.future.insert(self.future_source.get_next());
   |                                -----------------------------
   |                                |
   |                                first mutable borrow occurs here
   |                                argument requires that `self` is borrowed for `'a`
42 |         }
43 |         let fut = self.future.as_mut().unwrap();
   |                   ^^^^ second mutable borrow occurs here

error[E0499]: cannot borrow `self` as mutable more than once at a time
  --> src/main.rs:47:17
   |
33 | impl<'a> Stream for StreamAdapter<'a> {
   |      -- lifetime `'a` defined here
...
41 |             self.future.insert(self.future_source.get_next());
   |                                -----------------------------
   |                                |
   |                                first mutable borrow occurs here
   |                                argument requires that `self` is borrowed for `'a`
...
47 |                 self.future = None;
   |                 ^^^^ second mutable borrow occurs here

My understanding of the errors:

Is there a way to do this in safe Rust? Or do I need to go unsafe? Or is what I'm trying to do simply unsound regardless of how I go about it?


Solution

  • What you a trying to do is known as problematic in Rust (a self referential struct), see Why can't I store a value and a reference to that value in the same struct?.

    However, for futures there is a workaround: the trick is to wrap the value in a future that consumes the value and on completion, returns both the result and the value back. For example:

    async fn future(mut source: FutureSource) -> (String, FutureSource) {
        let result = source.get_next().await;
        (result, source)
    }
    
    struct StreamAdapter {
        future: Pin<Box<dyn Future<Output = (String, FutureSource)>>>,
    }
    
    impl StreamAdapter {
        fn new(source: FutureSource) -> Self {
            Self { future: Box::pin(future(source)) }
        }
    }
    
    impl Stream for StreamAdapter {
        type Item = String;
    
        fn poll_next(
            mut self: Pin<&mut Self>,
            cx: &mut std::task::Context<'_>,
        ) -> Poll<Option<Self::Item>> {
            let (result, source) = ready!(self.future.as_mut().poll(cx));
            self.future = Box::pin(future(source));
            Poll::Ready(Some(result))
        }
    }
    

    If you are concerned about the performance of heap-allocating the future every time, and until Type Alias Impl Trait (type F = impl Future) is ready, you can use something like tokio_util::sync::ReusableBoxFuture, that allows you to allocate only once.