rusttypes

Wrapped stream Unpin behaves unintuitively


After half a year of Rust coding, today I am for the first time truly baffled by a compiler error. Here is the condensed version of a library concept I was working on:

use futures::{Stream, StreamExt, pin_mut, stream::Next};

pub trait Stream2<T>: Stream<Item = T> {
    fn next2(&mut self) -> Next<'_, Self>
    where
        Self: Unpin,
    {
        self.next()
    }
}

pub struct WrappedStream<S>(S);

//impl<S: Unpin> Unpin for WrappedStream<S> {}

impl<T, S> Stream for WrappedStream<S>
where
    S: Stream<Item = T>,
{
    type Item = T;

    fn poll_next(
        self: std::pin::Pin<&mut Self>,
        _cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Option<Self::Item>> {
        // The implementation here is kind of irrelevant since our issue is at compile time
        std::task::Poll::Ready(None)
    }
}
impl<T, S> Stream2<T> for WrappedStream<S> where S: Stream<Item = T> {}

#[tokio::main]
async fn main() {
    let stream = WrappedStream(futures::stream::once(async move { 1 }));
    pin_mut!(stream);
    // This compiles
    while let Some(item) = stream.next().await {}

    let stream = WrappedStream(futures::stream::once(async move { 1 }));
    pin_mut!(stream);
    // This doesn't compile??
    while let Some(item) = stream.next2().await {}

playground link

I am highly confused by the fact that StreamExt::next and Stream2::next2 have seemingly the exact same signature and yet they behave differently. I played around with this a lot, one observation is that we can pin the inner stream before assigning it to the WrappedStream newtype, however in practice this will make the API less generic. I am really curious to fully understand the underlying issue here.

I figured out a fix, but I don't know how it works. I noticed that Stream has some extra implementations for DerefMut + Unpin (e.g. Pin, Box as I understand).

Adding an analogous impl lets this compile:

impl<T, P> Stream2<T> for Pin<P>
where
    P: DerefMut + Unpin,
    <P as Deref>::Target: Stream2<T>,
{
}

//impl<S, T> Stream2<T> for &mut S where S: Stream2<T> + Unpin + ?Sized {}

playground link

Now my question is really, how does this magic work? I assume there is some auto-resolution of the next() call for Pin<&mut Self>? I would really like to understand exactly how these two calls are resolved.


Solution

  • When you call stream.next2() the compiler doesn't find a Stream2<_> implementation for Pin<&mut WrappedStream<_>>, which is what stream is at that point. However, Pin implements DerefMut, so deref coercion kicks in: the compiler tries to use that to get a &mut WrappedStream to use the impl<T, S> Stream2<T> for WrappedStream<S> implementation, and that (obviously) fails since the T in WrappedStream<T> is not Unpin in this case.

    You need to provide an implementation for Pin as well, which can be a blanket one:

    impl<T, S> Stream2<T> for Pin<S>
    where
        S: DerefMut + Unpin,
        S::Target: Stream2<T>,
    {
    }
    

    Note that futures::stream::Stream has such an implementation, which is why stream.next() does not cause an error in your code.