asynchronousrustchannelpolling

Why does `while let` in a Future implementation trigger the Waker, but `if let` does not?


I've been working on a Future implementation in Rust, and I've encountered behavior I don't fully understand. Specifically, I'm using std::sync::mpsc::Receiver inside the poll method, and I'm trying to decide between while let and if let for receiving messages.

Here's the simplified version of my poll method:

fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
    let this = self.get_mut();

    // Option 1: Using `while let`
    while let Ok(new_attributes) = this.receiver.try_recv() {
        // Process the message
    }

    // Option 2: Using `if let`
    if let Ok(new_attributes) = this.receiver.try_recv() {
        // Process the message
    }

    Poll::Pending
}

What I’ve Observed

When I use while let, everything seems to work as expected: the Future is re-polled when new messages arrive. However, when I use if let, the Future appears to hang and never wakes up again, even when new messages are available.

My Understanding

I know that the Waker is supposed to be used to notify the executor that the Future should be re-polled. Given that my poll function always returns Poll::Pending, I would expect the Future to continue being polled repeatedly unless the process itself stops. However, I don't understand why while let ensures the Waker is triggered correctly, whereas if let does not seem to do the same.

Additional Context

The Future is spawned like this:

ctx.task_executor()
    .spawn_critical_blocking("transaction execution service", Box::pin(fut_struct));

The spawn_critical_blocking function works as follows:

pub fn spawn_critical_blocking<F>(&self, name: &'static str, fut: F) -> JoinHandle<()>
where
    F: Future<Output = ()> + Send + 'static,
{
    self.spawn_critical_as(name, fut, TaskKind::Blocking)
}

And internally:

fn spawn_critical_as<F>(
    &self,
    name: &'static str,
    fut: F,
    task_kind: TaskKind,
) -> JoinHandle<()>
where
    F: Future<Output = ()> + Send + 'static,
{
    // Wrapping the future and handling task errors
    let task = std::panic::AssertUnwindSafe(fut)
        .catch_unwind()
        .map_err(...);

    let task = async move {
        // Handling task shutdown and execution
        let task = pin!(task);
        let _ = select(on_shutdown, task).await;
    };

    self.spawn_on_rt(task, task_kind)
}

The spawn_on_rt method then uses spawn_blocking.

My Question

not?

Any insights or explanations would be greatly appreciated!


Solution

  • Before I address your questions, I'd like to challenge one of your assumptions:

    Given that my poll function always returns Poll::Pending, I would expect the Future to continue being polled repeatedly

    This, while technically possible, is not at all to be expected. Normal async runtimes (like tokio) attempt to be efficient by only polling when progress has been made, as signaled using the Waker conveyed by the Context.

    Notice that you never use the Context argument which, unless your future always returns Poll::Ready immediately, is an error, as there is no reliable reason for your poll function to ever be called again. (technically it's possible for it to be called again, because async runtimes may spuriously poll futures, but you shouldn't ever rely on that!)

    Why does using while let ensure that the Waker is triggered, but using if let does not?

    There is nothing about while let that is guaranteed to trigger the Waker. It is for this reason that the following code doesn't work:

    use std::task::{Context, Poll};
    use std::pin::Pin;
    use std::future::Future;
    use std::sync::mpsc::{channel, Receiver};
    
    struct Demo{
        receiver: Receiver<()>,
    }
    
    impl Future for Demo {
        type Output = ();
        fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
            let this = self.get_mut();
            println!("poll");
        
            // Option 1: Using `while let`
            while let Ok(_) = this.receiver.try_recv() {
                println!("processing");
                // Process the message
            }
        
            Poll::Pending
        }
    }
    
    #[tokio::main]
    pub async fn main() {
        let (sender, receiver) = channel();
        let demo = Demo{receiver};
        let thread = std::thread::spawn(move || {
            sender.send(()).unwrap();
        });
        println!("before");
        demo.await;
        println!("after");
        thread.join().unwrap();
    }
    

    Output:

    before
    poll
    

    What is the underlying mechanism that causes this difference in behavior?

    The only difference between if let and while let is that the latter will continue to read from the channel if there are more messages buffered. An alternative, more plausible, explanation for what you are seeing is that while let allows a single invocation of poll to process multiple messages while doing nothing to schedule a subsequent invocation of poll.

    How to fix

    The main issue with your code is that you are trying to use as synchronous channel implementation from the standard library in an async context; They're incompatible, and mixing them in this way is incorrect and unreliable. You should switch to an async-based channel such as tokio::sync::mpsc:

    use std::task::{Context, Poll};
    use std::pin::Pin;
    use std::future::Future;
    use tokio::sync::mpsc::{channel, Receiver};
    
    struct Demo{
        receiver: Receiver<()>,
    }
    
    impl Future for Demo {
        type Output = ();
        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
            let this = self.get_mut();
            
            println!("poll");
        
            if let Poll::Ready(result) = this.receiver.poll_recv(cx) {
                if let Some(_message) = result {
                    println!("processed");
                } else {
                    println!("channel closed");
                }
                return Poll::Ready(());
            }
        
            Poll::Pending
        }
    }
    
    #[tokio::main]
    pub async fn main() {
        let (sender, receiver) = channel(10);
        let demo = Demo{receiver};
        let task = tokio::spawn(async move {
            sender.send(()).await.unwrap();
        });
        println!("before");
        demo.await;
        println!("after");
        task.await.unwrap();
    }
    

    Notice how receiving on the channel now involves an inner poll-like call that takes the Context as an argument. If there is not currently a message to receive, the Waker will be associated with the channel (in an implementation-specific way) such that it may cause the future to be polled when a message arrives.

    Output:

    before
    poll
    poll
    processsed
    after
    

    Notice there are two polls. Presumably, the first one encountered no message to receive. Proper use of async polling meant that the future was productively polled again.