After half a year of Rust coding, today I am for the first time truly baffled by a compiler error. Here is the condensed version of a library concept I was working on:
use futures::{Stream, StreamExt, pin_mut, stream::Next};
pub trait Stream2<T>: Stream<Item = T> {
fn next2(&mut self) -> Next<'_, Self>
where
Self: Unpin,
{
self.next()
}
}
pub struct WrappedStream<S>(S);
//impl<S: Unpin> Unpin for WrappedStream<S> {}
impl<T, S> Stream for WrappedStream<S>
where
S: Stream<Item = T>,
{
type Item = T;
fn poll_next(
self: std::pin::Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
// The implementation here is kind of irrelevant since our issue is at compile time
std::task::Poll::Ready(None)
}
}
impl<T, S> Stream2<T> for WrappedStream<S> where S: Stream<Item = T> {}
#[tokio::main]
async fn main() {
let stream = WrappedStream(futures::stream::once(async move { 1 }));
pin_mut!(stream);
// This compiles
while let Some(item) = stream.next().await {}
let stream = WrappedStream(futures::stream::once(async move { 1 }));
pin_mut!(stream);
// This doesn't compile??
while let Some(item) = stream.next2().await {}
I am highly confused by the fact that StreamExt::next
and Stream2::next2
have seemingly the exact same signature and yet they behave differently. I played around with this a lot, one observation is that we can pin the inner stream before assigning it to the WrappedStream
newtype, however in practice this will make the API less generic. I am really curious to fully understand the underlying issue here.
I figured out a fix, but I don't know how it works. I noticed that Stream
has some extra implementations for DerefMut + Unpin
(e.g. Pin
, Box
as I understand).
Adding an analogous impl lets this compile:
impl<T, P> Stream2<T> for Pin<P>
where
P: DerefMut + Unpin,
<P as Deref>::Target: Stream2<T>,
{
}
//impl<S, T> Stream2<T> for &mut S where S: Stream2<T> + Unpin + ?Sized {}
Now my question is really, how does this magic work? I assume there is some auto-resolution of the next()
call for Pin<&mut Self>
? I would really like to understand exactly how these two calls are resolved.
When you call stream.next2()
the compiler doesn't find a Stream2<_>
implementation for Pin<&mut WrappedStream<_>>
, which is what stream
is at that point. However, Pin
implements DerefMut
, so deref coercion kicks in: the compiler tries to use that to get a &mut WrappedStream
to use the impl<T, S> Stream2<T> for WrappedStream<S>
implementation, and that (obviously) fails since the T
in WrappedStream<T>
is not Unpin
in this case.
You need to provide an implementation for Pin
as well, which can be a blanket one:
impl<T, S> Stream2<T> for Pin<S>
where
S: DerefMut + Unpin,
S::Target: Stream2<T>,
{
}
Note that futures::stream::Stream
has such an implementation, which is why stream.next()
does not cause an error in your code.