When I run
use tokio::sync::mpsc;
#[tokio::main(flavor = "multi_thread", worker_threads = 1)]
async fn main() {
let (tx, mut rx) = mpsc::channel(1);
tokio::spawn(async move {
while let Some(i) = rx.recv().await {
println!("got = {}", i);
}
});
for i in 0..5 {
// busy calculation
std::thread::sleep(std::time::Duration::from_millis(10));
match tx.try_send(i) {
Ok(_) => {
println!("sent = {}", i);
},
Err(err) => {
println!("{}", err);
}
};
};
}
I got
sent = 0
got = 0
sent = 1
got = 1
sent = 2
got = 2
sent = 3
got = 3
sent = 4
Form my understanding, the only worker is working on the for loop because it never yield. The only worker should have no chance to work on receiving. Therefore, the channel should be full after the first sending. Turns out I am wrong. What am I missing?
The code in a #[tokio::main]
function does not actually run on a worker thread. Therefore, the spawned task is sent to the only worker thread, while the for
loop is executed on the program's main thread.
Under the hood, tokio::main
lifts the function body into an async
block, builds a runtime, then passes the future for the generated async block to Runtime::block_on
. According to this method's documentation:
Note that the future required by this function does not run as a worker. The expectation is that other tasks are spawned by the future here. Awaiting on other futures from the future provided here will not perform as fast as those spawned as workers.
You can realize what you are expecting in two ways.
The first way is to lift the body of your main
function into a new task:
use tokio::sync::mpsc;
#[tokio::main(flavor = "multi_thread", worker_threads = 1)]
async fn main() {
tokio::spawn(async move {
let (tx, mut rx) = mpsc::channel(1);
tokio::spawn(async move {
while let Some(i) = rx.recv().await {
println!("got = {}", i);
}
});
for i in 0..5 {
// busy calculation
std::thread::sleep(std::time::Duration::from_millis(10));
match tx.try_send(i) {
Ok(_) => {
println!("sent = {}", i);
}
Err(err) => {
println!("{}", err);
}
};
}
})
.await
.unwrap();
}
The second is to use the "current thread" runtime instead of the multithreaded runtime. This runs all tasks on the main thread.
use tokio::sync::mpsc;
#[tokio::main(flavor = "current_thread")]
async fn main() {
let (tx, mut rx) = mpsc::channel(1);
tokio::spawn(async move {
while let Some(i) = rx.recv().await {
println!("got = {}", i);
}
});
for i in 0..5 {
// busy calculation
std::thread::sleep(std::time::Duration::from_millis(10));
match tx.try_send(i) {
Ok(_) => {
println!("sent = {}", i);
}
Err(err) => {
println!("{}", err);
}
};
}
}
Both of these will show "sent = 0" and then try_send
will fail with "no available capacity."