rustchannelrust-tokiompsc

mpsc channels stucking at the receiver


use std::sync::Arc;

use tokio::sync::mpsc;

async fn student(id : i32,tx : Arc<mpsc::Sender<String>>) {
    println!("student {} is getting their hw.",id);
    tx.send(format!("student {}'s hw !",id)).await.unwrap();
}

async fn teacher(mut rc : mpsc::Receiver<String>) -> Vec<String> {
    let mut homeworks = Vec::new();
    while let Some(hw) = rc.recv().await {
        println!("{hw}");
        homeworks.push(hw);
    }
    homeworks
}

#[tokio::main]
async fn main() {
    let (tx,rc): (mpsc::Sender<String>, mpsc::Receiver<String>) = mpsc::channel(100);
    let ch_arc: Arc<mpsc::Sender<String>> = Arc::new(tx);
    
    for i in 0..10 {
        tokio::task::spawn(student(i,ch_arc.clone()));
    }
    let hws = teacher(rc).await;
    println!("{:?}",hws);
}

I am currently learning channels in Rust and now I have faced some issues. the issue is that when I am running the line teacher(rc).await;
code is getting stuck at that point I don't have any clue why this is happening, but one thing that I observed while solving this issue is that when I am spawning multiple tasks I am getting stuck in the loop but when I am only doing one spawn at student(i,ch_arc.clone()); line it is working fine. can anybody help me out with this?


Solution

  • Take a look at this loop here:

    while let Some(hw) = rc.recv().await {
    

    recv will only return None when the sender is dropped, hanging up the channel. The sender is in an Arc, so it will be dropped when all Arcs have been dropped.

    There's 10 Arcs inside the student futures, which are dropped when each of those finish. They don't have anything to wait on, so that happens quickly. However, there is one Arc left over. ch_arc isn't dropped until the end of main, which happens after calling teacher. So ch_arc is never dropped.

    You can fix this by manually dropping ch_arc:

    #[tokio::main]
    async fn main() {
        let (tx, rc): (mpsc::Sender<String>, mpsc::Receiver<String>) = mpsc::channel(100);
        let ch_arc: Arc<mpsc::Sender<String>> = Arc::new(tx);
    
        for i in 0..10 {
            tokio::task::spawn(student(i, ch_arc.clone()));
        }
    
        drop(ch_arc);
    
        let hws = teacher(rc).await;
        println!("{:?}", hws);
    }
    

    Or by putting ch_arc inside a block. This block returns rc since you need to use rc later.

    #[tokio::main]
    async fn main() {
        let rc = {
            let (tx, rc): (mpsc::Sender<String>, mpsc::Receiver<String>) = mpsc::channel(100);
            let ch_arc: Arc<mpsc::Sender<String>> = Arc::new(tx);
    
            for i in 0..10 {
                tokio::task::spawn(student(i, ch_arc.clone()));
            }
            rc
        };
    
        let hws = teacher(rc).await;
        println!("{:?}", hws);
    }