socketsrustconcurrencytransport

Deadlock issues caused by concurrent read and write of rust?


How to solve the problem of using blocking sockets to separate reads and writes between different threads during concurrent reads and writes of rust, where threads share data (transport or socket) and use arcto cause deadlocks?

here is a simple example.

application client write by rust:

use std::env;
use std::io::{Error, Read, Write};
use std::net::{TcpStream, ToSocketAddrs};
use std::ptr::eq;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tokio::time::sleep;

#[derive(Debug)]
pub struct TcpTransport {
    pub conn: TcpStream,
}

impl TcpTransport {
    fn send_packet(&mut self, data: &[u8]) -> Result<(), Error> {
        self.conn.write_all(data)?;
        Ok(())
    }

    fn read_packet(&mut self) -> Result<Vec<u8>, Error> {
        let mut lenbuf = [0u8; 2];
        self.conn.read_exact(&mut lenbuf)?;

        let length = (lenbuf[0] as usize) << 8 | lenbuf[1] as usize;
        let mut databuf = vec![0u8; length];
        self.conn.read_exact(&mut databuf)?;

        Ok(databuf)
    }

    fn close(&self) -> Result<(), Error> {
        println!("close transport");
        self.conn.shutdown(std::net::Shutdown::Both)?;
        Ok(())
    }
}

pub fn new_tcp_transport(host: &str, port: u16) -> Result<TcpTransport, Error> {
    let socket_addr = (host, port).to_socket_addrs()?.next().unwrap();
    let conn = TcpStream::connect_timeout(&socket_addr, Duration::from_secs(10))?;
    Ok(TcpTransport { conn })
}

#[tokio::main]
async fn main() {
    // tsport Arc<Mutex><dyn Transport+Send+sync>
    let tsport = new_tcp_transport("127.0.0.1", 4000).unwrap();
    let tsport = Arc::new(Mutex::new(tsport));
    let tsport1 = tsport.clone();
    let tsport2 = tsport.clone();
    tokio::spawn(async move {
        println!("read worker started");
        'read_loop: loop {
            // sleep(Duration::from_millis(1000)).await;
            println!("read exec here===");
            let packet = match tsport1.lock().unwrap().read_packet() {
                Ok(packet) => packet,
                Err(err) => {
                    eprintln!("Transport read packet error: {:?}", err);
                    break 'read_loop;
                }
            };
            println!("read packet{:?}", packet);
        }
        println!("read worker stoped");
    });
    let _=tokio::spawn(async move {
        println!("write worker started");
        'writeloop: loop {
            sleep(Duration::from_millis(1000)).await;
            println!("write exec here===");
            let mut ts = tsport2.lock().unwrap();
            let data = vec![0x1,0x2,0x3,0x4];
            if let Err(er) = ts.send_packet(&data) {
                eprintln!("Transport write packet error: {:?}", er);
                break 'writeloop;
            }
            println!("send packet success.");
        }
        println!("write worker stoped");
    }).await;
}


application server write by nodejs:

var net = require('net');
net.createServer(function(socket){
    socket.on('data', function(data){
        console.log("server recv data:",data);
    });
}).listen(4000);

console.log('server listen 127.0.0.1:4000');

cargo.toml

[package]
name = "readwrite"
version = "0.1.0"
edition = "2021"

[dependencies]
tokio = { version = "1.28.1", features = ["full"] }

application client write by nodejs:

var net = require('net');

var client = new net.Socket();
client.connect(4000, '127.0.0.1', function() {
    console.log('Connected');
    setInterval(() => {
        var data = Buffer.from("hello server");
        var datalen = data.length;
        console.log('client write===');
        client.write(Buffer.concat([Buffer.from([datalen >> 8, datalen % 256]), data]));
    }, 1000);

});

client.on('data', function(data) {
    console.log('client received: ',data);
});

Solution

  • You can use the fact that &TcpStream (a reference to TcpStream) also implements Read and Write:

    impl TcpTransport {
        fn send_packet(&self, data: &[u8]) -> Result<(), Error> {
            (&self.conn).write_all(data)?;
            Ok(())
        }
    
        fn read_packet(&self) -> Result<Vec<u8>, Error> {
            let mut lenbuf = [0u8; 2];
            (&self.conn).read_exact(&mut lenbuf)?;
    
            let length = (lenbuf[0] as usize) << 8 | lenbuf[1] as usize;
            let mut databuf = vec![0u8; length];
            (&self.conn).read_exact(&mut databuf)?;
    
            Ok(databuf)
        }
    }
    
    #[tokio::main]
    async fn main() {
        let tsport = new_tcp_transport("127.0.0.1", 4000).unwrap();
        let tsport = Arc::new(tsport);
        let tsport1 = tsport.clone();
        let tsport2 = tsport.clone();
        tokio::spawn(async move {
            println!("read worker started");
            'read_loop: loop {
                // sleep(Duration::from_millis(1000)).await;
                let packet = match tsport1.read_packet() {
                    Ok(packet) => packet,
                    Err(err) => {
                        eprintln!("Transport read packet error: {:?}", err);
                        break 'read_loop;
                    }
                };
                println!("read packet{:?}", packet);
            }
            println!("read worker stoped");
        });
        let _ = tokio::spawn(async move {
            println!("write worker started");
            'writeloop: loop {
                // sleep(Duration::from_millis(1000)).await;
                let data = vec![0x1, 0x2, 0x3, 0x4];
                if let Err(er) = tsport2.send_packet(&data) {
                    eprintln!("Transport write packet error: {:?}", er);
                    break 'writeloop;
                }
                println!("send packet success.");
            }
            println!("write worker stoped");
        })
        .await;
    }
    

    But your code is incorrect because you must never, ever, block in async context. And std's TcpStream is blocking. You need to use tokio's TcpStream. It does not implement AsyncRead or AsyncWrite for references, but tokio has tokio::io::split() to split an AsyncRead + AsyncWrite into halves:

    use std::io::Error;
    use std::time::Duration;
    
    use tokio::io::{AsyncReadExt, AsyncWriteExt, ReadHalf};
    use tokio::net::TcpStream;
    
    pub async fn new_tcp_transport(host: &str, port: u16) -> Result<TcpStream, Error> {
        let connect_fut = TcpStream::connect((host, port));
        let conn = tokio::time::timeout(Duration::from_secs(10), connect_fut).await??;
        Ok(conn)
    }
    
    async fn read_packet(conn: &mut ReadHalf<TcpStream>) -> Result<Vec<u8>, Error> {
        let mut lenbuf = [0u8; 2];
        conn.read_exact(&mut lenbuf).await?;
    
        let length = (lenbuf[0] as usize) << 8 | lenbuf[1] as usize;
        let mut databuf = vec![0u8; length];
        conn.read_exact(&mut databuf).await?;
    
        Ok(databuf)
    }
    
    #[tokio::main]
    async fn main() {
        // tsport Arc<Mutex><dyn Transport+Send+sync>
        let tsport = new_tcp_transport("127.0.0.1", 4000).await.unwrap();
        let (mut read, mut write) = tokio::io::split(tsport);
        tokio::spawn(async move {
            println!("read worker started");
            'read_loop: loop {
                // sleep(Duration::from_millis(1000)).await;
                let packet = match read_packet(&mut read).await {
                    Ok(packet) => packet,
                    Err(err) => {
                        eprintln!("Transport read packet error: {:?}", err);
                        break 'read_loop;
                    }
                };
                println!("read packet{:?}", packet);
            }
            println!("read worker stoped");
        });
        let _ = tokio::spawn(async move {
            println!("write worker started");
            'writeloop: loop {
                // sleep(Duration::from_millis(1000)).await;
                let data = vec![0x1, 0x2, 0x3, 0x4];
                if let Err(er) = write.write_all(&data).await {
                    eprintln!("Transport write packet error: {:?}", er);
                    break 'writeloop;
                }
                println!("send packet success.");
            }
            println!("write worker stoped");
        })
        .await;
    }