multithreadingrustthreadpoolrust-cargorust-tokio

What is the benefit of using tokio instead of OS threads in Rust


I am trying to make a multithreaded tcp communication program in rust

the idea is that there exists a listening socket on the main thread, and as connections come in the work is handled by worker threads

I previously used a ThreadPool approach I found in the Rust book, but as I understand tokio is able to 'automatically' assign work to threads from a pool

I am confused with what is the difference between OS threads and tokio tasks (mostly because you use spawn to create both)

here is some code

fn main() {
    println!("Hello World!");
    let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 103, 7)), 2048);
    println!("socket -> {}", socket);

    // You need to use the tokio runtime to execute async functions
    let rt = tokio::runtime::Builder::new_current_thread()
        .enable_all()
        .build()
        .unwrap();
    rt.block_on(async {
        let listener = StartListen::new(socket).await.unwrap();
    });
}

and i have a StartListen defined in another file

// Defines the StartListen class
pub struct StartListen {
    listener: TcpListener,
}


// Implementation for StartListen class
impl StartListen {
    pub async fn new(socket: SocketAddr) -> Result<StartListen, StartListenError>{
        println!("Attempting to listen");
        let bind_result = TcpListener::bind(socket).await;
        sleep(Duration::from_secs(5)).await;
        match bind_result {
            Ok(listener) => {
                println!("Server is listening on {}", &socket);
                Ok(StartListen { listener })
            }
            Err(err) => Err(StartListenError::BindError(err)),
        }
    }
}

to add some more context, the idea is there are 2 types of messages that this socket is expecting

// Defines the types of messages to expect
pub enum RequestType {
    RequestWork {
        request_message: String,
        parameter: String,
        sender_timestamp: String,
    },
    CloseConnection {
        initial_timestamp: String,
        final_timestamp: String,
    },
    Invalid(String),
}

I am yet to add a handle_connection method, would I have to define the handle_connection to sit in a loop and spawn tasks?

    pub async fn accept_connections(&self) {
        loop {
            let (mut stream, addr) = self.listener.accept().await.unwrap();
            println!("New connection from {}", addr);

            // Spawn a new task to handle the connection
            tokio::spawn(async move {
                let mut buffer = [0; 1024];
                loop {
                    let n = match stream.read(&mut buffer).await {
                        Ok(n) if n == 0 => return, // Connection closed
                        Ok(n) => n,
                        Err(e) => {
                            eprintln!("Error reading from socket: {}", e);
                            return;
                        }
                    };

                    // Convert the received message into a RequestType
                    let message = String::from_utf8_lossy(&buffer[..n]);

Solution

  • You are very close to understanding already. You are asking the right questions.

    Let me give you a short rundown of possible server implementations:

    There are a lot of subtleties to be considered, though. I strongly recommend reading the tokio tutorial, it explains many of those concepts. After that, I recommend reading the async book.

    For example, some important subtleties: