rustasync-awaitfutureasyncsocket

How to run futures containing borrowed TcpStream concurrently?


I am trying to make this code snippet run concurrently instead of sequentially since the number of peers can be a large value. I am using async_std 1.4 and rust 1.41

pub struct Peer {
    pub peer_id: String,
    pub tcp_stream: Arc<TcpStream>,
    pub public_key: [u8; 32],
}

async fn send_to_all_peers(message: Protocol, peers: &HashMap<String,Peer>) -> Result<()> {
    for peer in peers.values() {
        let mut stream = &*peer.tcp_stream;
        stream.write_all(&bincode::serialize(&message)?).await?;
    }
    Ok(())
}

I've tried to use the futures::future::join_all method without any luck since wrapping future I created and used within async_std::task::spawn requires a static lifetime. Here is what I tried:

async fn send_to_all_peers(message: Protocol, peers: &HashMap<String,Peer>) {
    let handles = peers.values().into_iter().map(|peer| {
        task::spawn(
            async {
                let mut stream = &*peer.tcp_stream;
                if let Err(err) = stream
                    .write_all(&bincode::serialize(&message).unwrap())
                    .await
                {
                    error!("Error when writing to tcp_stream: {}", err);
                }
            }
        )
    });
    futures::future::join_all(handles).await;
}

I'm sure there is some method I am missing, thanks for any help!


Solution

  • Since you are trying to send message concurrently, each task has to have its own copy of the message:

    use async_std::{task, net::TcpStream};
    use futures::{future, io::AsyncWriteExt};
    use serde::Serialize;
    use std::{
        collections::HashMap,
        error::Error,
        sync::Arc,
    };
    
    pub struct Peer {
        pub peer_id: String,
        pub tcp_stream: Arc<TcpStream>,
        pub public_key: [u8; 32],
    }
    
    #[derive(Serialize)]
    struct Protocol;
    
    async fn send_to_all_peers(
        message: Protocol,
        peers: &HashMap<String, Peer>)
        -> Result<(), Box<dyn Error>>
    {
        let msg = bincode::serialize(&message)?;
        let handles = peers.values()
            .map(|peer| {
                let msg = msg.clone();
                let socket = peer.tcp_stream.clone();
                task::spawn(async move {
                    let mut socket = &*socket;
                    socket.write_all(&msg).await
                })
            });
    
        future::try_join_all(handles).await?;
        Ok(())
    }