I have a type which semantically behaves like a stream but doesn't actually implement the futures_util::Stream
trait. Instead, it has a method get_next
which repeatedly returns a Future
yielding the next item.
I'm trying to implement an adapter around this in order to use it as a stream but I'm running into some trouble. I created the following tow example:
use futures_util::Stream;
use std::{future::Future, pin::Pin, task::Poll};
struct FutureSource {/* omitted */}
impl FutureSource {
pub fn get_next(&mut self) -> FutureString {
FutureString {
source: self,
important_state: todo!(),
}
}
}
struct FutureString<'a> {
source: &'a mut FutureSource,
important_state: String,
}
impl<'a> Future for FutureString<'a> {
type Output = String;
fn poll(self: Pin<&mut Self>, _cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
unimplemented!()
}
}
struct StreamAdapter<'a> {
future_source: &'a mut FutureSource,
future: Option<FutureString<'a>>,
}
impl<'a> Stream for StreamAdapter<'a> {
type Item = <FutureString<'a> as Future>::Output;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
if self.future.is_none() {
self.future.insert(self.future_source.get_next());
}
let fut = self.future.as_mut().unwrap();
match Pin::new(fut).poll(cx) {
Poll::Ready(value) => {
self.future = None;
Poll::Ready(Some(value))
}
Poll::Pending => Poll::Pending,
}
}
}
This does not compile and gives the following errors:
error[E0499]: cannot borrow `self` as mutable more than once at a time
--> src/main.rs:41:32
|
41 | self.future.insert(self.future_source.get_next());
| ---- ------ ^^^^ second mutable borrow occurs here
| | |
| | first borrow later used by call
| first mutable borrow occurs here
error: lifetime may not live long enough
--> src/main.rs:41:32
|
33 | impl<'a> Stream for StreamAdapter<'a> {
| -- lifetime `'a` defined here
...
37 | mut self: Pin<&mut Self>,
| - let's call the lifetime of this reference `'1`
...
41 | self.future.insert(self.future_source.get_next());
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ argument requires that `'1` must outlive `'a`
error[E0499]: cannot borrow `self` as mutable more than once at a time
--> src/main.rs:43:19
|
33 | impl<'a> Stream for StreamAdapter<'a> {
| -- lifetime `'a` defined here
...
41 | self.future.insert(self.future_source.get_next());
| -----------------------------
| |
| first mutable borrow occurs here
| argument requires that `self` is borrowed for `'a`
42 | }
43 | let fut = self.future.as_mut().unwrap();
| ^^^^ second mutable borrow occurs here
error[E0499]: cannot borrow `self` as mutable more than once at a time
--> src/main.rs:47:17
|
33 | impl<'a> Stream for StreamAdapter<'a> {
| -- lifetime `'a` defined here
...
41 | self.future.insert(self.future_source.get_next());
| -----------------------------
| |
| first mutable borrow occurs here
| argument requires that `self` is borrowed for `'a`
...
47 | self.future = None;
| ^^^^ second mutable borrow occurs here
My understanding of the errors:
cannot borrow `self` as mutable more than once at a time
self
in self.future_source.get_next()
has lifetime 'a
which necessarily means the lifetime of the self
borrow must be at least 'a
. This lifetime is beyond the scope of the entire function call which means we cannot borrow self
again at all.lifetime may not live long enough
:self
for 'a
which means that the lifetime of the reference to self
must be at least 'a
. While it's certainly possible, the compiler can't enforce that. I could specify that the reference to self
has lifetime 'a
but that would not align with the trait fn.Is there a way to do this in safe Rust? Or do I need to go unsafe? Or is what I'm trying to do simply unsound regardless of how I go about it?
What you a trying to do is known as problematic in Rust (a self referential struct), see Why can't I store a value and a reference to that value in the same struct?.
However, for futures there is a workaround: the trick is to wrap the value in a future that consumes the value and on completion, returns both the result and the value back. For example:
async fn future(mut source: FutureSource) -> (String, FutureSource) {
let result = source.get_next().await;
(result, source)
}
struct StreamAdapter {
future: Pin<Box<dyn Future<Output = (String, FutureSource)>>>,
}
impl StreamAdapter {
fn new(source: FutureSource) -> Self {
Self { future: Box::pin(future(source)) }
}
}
impl Stream for StreamAdapter {
type Item = String;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
let (result, source) = ready!(self.future.as_mut().poll(cx));
self.future = Box::pin(future(source));
Poll::Ready(Some(result))
}
}
If you are concerned about the performance of heap-allocating the future every time, and until Type Alias Impl Trait (type F = impl Future
) is ready, you can use something like tokio_util::sync::ReusableBoxFuture
, that allows you to allocate only once.