I've been working on a Future implementation in Rust, and I've encountered behavior I don't fully understand. Specifically, I'm using std::sync::mpsc::Receiver
inside the poll
method, and I'm trying to decide between while let
and if let
for receiving messages.
Here's the simplified version of my poll method:
fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
// Option 1: Using `while let`
while let Ok(new_attributes) = this.receiver.try_recv() {
// Process the message
}
// Option 2: Using `if let`
if let Ok(new_attributes) = this.receiver.try_recv() {
// Process the message
}
Poll::Pending
}
What I’ve Observed
When I use while let, everything seems to work as expected: the Future is re-polled when new messages arrive. However, when I use if let, the Future appears to hang and never wakes up again, even when new messages are available.
My Understanding
I know that the Waker is supposed to be used to notify the executor that the Future should be re-polled. Given that my poll function always returns Poll::Pending, I would expect the Future to continue being polled repeatedly unless the process itself stops. However, I don't understand why while let ensures the Waker is triggered correctly, whereas if let does not seem to do the same.
Additional Context
The Future is spawned like this:
ctx.task_executor()
.spawn_critical_blocking("transaction execution service", Box::pin(fut_struct));
The spawn_critical_blocking function works as follows:
pub fn spawn_critical_blocking<F>(&self, name: &'static str, fut: F) -> JoinHandle<()>
where
F: Future<Output = ()> + Send + 'static,
{
self.spawn_critical_as(name, fut, TaskKind::Blocking)
}
And internally:
fn spawn_critical_as<F>(
&self,
name: &'static str,
fut: F,
task_kind: TaskKind,
) -> JoinHandle<()>
where
F: Future<Output = ()> + Send + 'static,
{
// Wrapping the future and handling task errors
let task = std::panic::AssertUnwindSafe(fut)
.catch_unwind()
.map_err(...);
let task = async move {
// Handling task shutdown and execution
let task = pin!(task);
let _ = select(on_shutdown, task).await;
};
self.spawn_on_rt(task, task_kind)
}
The spawn_on_rt
method then uses spawn_blocking
.
My Question
not?
Any insights or explanations would be greatly appreciated!
Before I address your questions, I'd like to challenge one of your assumptions:
Given that my poll function always returns
Poll::Pending
, I would expect theFuture
to continue being polled repeatedly
This, while technically possible, is not at all to be expected. Normal async
runtimes (like tokio
) attempt to be efficient by only polling when progress has been made, as signaled using the Waker
conveyed by the Context
.
Notice that you never use the Context
argument which, unless your future always returns Poll::Ready
immediately, is an error, as there is no reliable reason for your poll
function to ever be called again. (technically it's possible for it to be called again, because async
runtimes may spuriously poll
futures, but you shouldn't ever rely on that!)
Why does using while let ensure that the
Waker
is triggered, but using if let does not?
There is nothing about while let
that is guaranteed to trigger the Waker
. It is for this reason that the following code doesn't work:
use std::task::{Context, Poll};
use std::pin::Pin;
use std::future::Future;
use std::sync::mpsc::{channel, Receiver};
struct Demo{
receiver: Receiver<()>,
}
impl Future for Demo {
type Output = ();
fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
println!("poll");
// Option 1: Using `while let`
while let Ok(_) = this.receiver.try_recv() {
println!("processing");
// Process the message
}
Poll::Pending
}
}
#[tokio::main]
pub async fn main() {
let (sender, receiver) = channel();
let demo = Demo{receiver};
let thread = std::thread::spawn(move || {
sender.send(()).unwrap();
});
println!("before");
demo.await;
println!("after");
thread.join().unwrap();
}
Output:
before
poll
What is the underlying mechanism that causes this difference in behavior?
The only difference between if let
and while let
is that the latter will continue to read from the channel if there are more messages buffered. An alternative, more plausible, explanation for what you are seeing is that while let
allows a single invocation of poll
to process multiple messages while doing nothing to schedule a subsequent invocation of poll
.
The main issue with your code is that you are trying to use as synchronous channel implementation from the standard library in an async
context; They're incompatible, and mixing them in this way is incorrect and unreliable. You should switch to an async
-based channel such as tokio::sync::mpsc
:
use std::task::{Context, Poll};
use std::pin::Pin;
use std::future::Future;
use tokio::sync::mpsc::{channel, Receiver};
struct Demo{
receiver: Receiver<()>,
}
impl Future for Demo {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
println!("poll");
if let Poll::Ready(result) = this.receiver.poll_recv(cx) {
if let Some(_message) = result {
println!("processed");
} else {
println!("channel closed");
}
return Poll::Ready(());
}
Poll::Pending
}
}
#[tokio::main]
pub async fn main() {
let (sender, receiver) = channel(10);
let demo = Demo{receiver};
let task = tokio::spawn(async move {
sender.send(()).await.unwrap();
});
println!("before");
demo.await;
println!("after");
task.await.unwrap();
}
Notice how receiving on the channel now involves an inner poll
-like call that takes the Context
as an argument. If there is not currently a message to receive, the Waker
will be associated with the channel (in an implementation-specific way) such that it may cause the future to be polled when a message arrives.
Output:
before
poll
poll
processsed
after
Notice there are two polls. Presumably, the first one encountered no message to receive. Proper use of async
polling meant that the future was productively polled again.