rust

Multithreaded Web Server from the Rust book, but implemented with shared memory instead of message passing


I'm following the Rust Book's chapter 20 to create a multithreaded web server. Everything went pretty smooth, the implementation they describe up until the end of this chapter works as expected.

Here's the code for it, just so that you don't have to gather it from pieces:

// main.rs
use std::io::{BufRead, BufReader, Write};
use std::net::{TcpListener, TcpStream};
use std::time::Duration;
use std::{fs, thread};
use thr::ThreadPool;

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        pool.execute(|| handle_connection(stream));
    }
}

fn handle_connection(mut stream: TcpStream) {
    let buff_reader = BufReader::new(&stream);
    let request_line = buff_reader.lines().next().unwrap().unwrap();

    let (status_line, file_name) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    let html = fs::read_to_string(file_name).unwrap();

    let len = html.len();
    let resp = format!("{status_line}\r\nContent-Length: {len}\r\n\r\n{html}");

    stream.write_all(resp.as_bytes()).unwrap();
}
// lib.rs
use std::sync::{mpsc, Arc, Mutex};
use std::thread;
use std::thread::JoinHandle;

struct Worker {
    id: usize,
    handle: JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let handle = thread::spawn(move || {
            loop {
                let job = receiver.lock().unwrap().recv().unwrap();

                println!("Worker {id} got a job; executing.");

                job();
            }
        });
        Worker { id, handle }
    }
}

type Job = Box<dyn FnOnce() + Send + 'static>;

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let mut workers = Vec::with_capacity(size);
        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        for id in 0..size {
            let worker = Worker::new(id, Arc::clone(&receiver));
            workers.push(worker);
        }

        ThreadPool { workers, sender }
    }

    pub fn build(size: usize) -> Result<ThreadPool, PoolCreationError> {
        if size > 0 {
            return Err(PoolCreationError);
        }
        Ok(ThreadPool::new(size))
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        self.sender.send(Box::new(f)).unwrap();
    }
}

pub struct PoolCreationError;

I have decided to rewrite it using the memory sharing mechanism instead of message passing, just for educational purposes. There are no changes in the main.rs file, and the updated lib.rs looks like this:

// lib.rs
use std::sync::{Arc, Mutex};
use std::thread;
use std::thread::JoinHandle;

struct Worker {
    id: usize,
    handle: JoinHandle<()>,
}

impl Worker {
    fn new(id: usize, queue: Arc<Mutex<Vec<Job>>>) -> Worker {
        let handle = thread::spawn(move || {
            loop {
                let maybe_job = queue.lock().unwrap().pop();
                if let Some(job) = maybe_job {
                    job()
                }
            }
        });
        Worker { id, handle }
    }
}

type Job = Box<dyn FnOnce() + Send + 'static>;

pub struct ThreadPool {
    workers: Vec<Worker>,
    queue: Arc<Mutex<Vec<Job>>>,
}

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let mut workers = Vec::with_capacity(size);
        let queue = Arc::new(Mutex::new(Vec::with_capacity(size)));

        for id in 0..size {
            let worker = Worker::new(id, Arc::clone(&queue));
            workers.push(worker);
        }

        ThreadPool { workers, queue }
    }

    pub fn build(size: usize) -> Result<ThreadPool, PoolCreationError> {
        if size > 0 {
            return Err(PoolCreationError);
        }
        Ok(ThreadPool::new(size))
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        self.queue.lock().unwrap().push(Box::new(f));
    }
}

pub struct PoolCreationError;

In short, instead of creating an mpsc::channel and passing a reference to the receiver to workers and holding the sender on the ThreadPool instance, I'm now holding in the ThreadPool instance a queue, which is a vector of jobs (Arc<Mutex<Vec<Job>>>) and passing to the workers a reference copy to the same queue.

Inside the worker, I'm making it pop the queue (which yields a job) and execute it.

Both versions compile and work. From what I can tell, they work identically (from the user's perspective, i.e. when making requests from a browser). The question is, is it really so? Am I not missing something? Are message passing and memory sharing really identical in this particular use case? What are other implications of my approach I don't see?

P.S. The only "noticable" distinction between the 2 versions for me is that the fans of my laptop seeminly start to spin faster for my memory sharing implementation. However, I'm not able to explain it looking at the system resources monitor, for example (which is also a bit unclear to me, why that happens).


Solution

  • loop {
        let maybe_job = queue.lock().unwrap().pop();
        if let Some(job) = maybe_job {
            job()
        }
    }
    

    this is a busy loop, it keeps your cores pinned to 100% by the thread trying and failing to obtain tasks from the Vec, you never tell the thread that it can "sleep" in order to save power and give time for other threads to use this core. you can do that by waiting on a Condvar whenever it fails to pop from the Vec, then whenever an item is pushed the sender will notify the waiters.

    impl Worker {
        fn new(id: usize, queue: Arc<Mutex<Vec<Job>>>, condvar: Arc<Condvar>) -> Worker {
            let handle = thread::spawn(move || {
                loop {
                    let mut lock = queue.lock().unwrap();
                    let maybe_job = lock.pop();
                    if let Some(job) = maybe_job {
                        job()
                    }
                    else
                    {
                        let _unused = condvar.wait(lock);
                    }
                }
            });
            Worker { id, handle }
        }
    }
    
    
    pub struct ThreadPool {
        workers: Vec<Worker>,
        queue: Arc<Mutex<Vec<Job>>>,
        condvar: Arc<Condvar>,
    }
    
    impl ThreadPool {
        pub fn new(size: usize) -> ThreadPool {
            assert!(size > 0);
    
            let mut workers = Vec::with_capacity(size);
            let queue = Arc::new(Mutex::new(Vec::with_capacity(size)));
            let condvar = Arc::new(Condvar::new());
    
            for id in 0..size {
                let worker = Worker::new(id, Arc::clone(&queue), condvar.clone());
                workers.push(worker);
            }
    
            ThreadPool { workers, queue,  condvar}
        }
    
        ....
    
        pub fn execute<F>(&self, f: F)
        where
            F: FnOnce() + Send + 'static,
        {
            self.queue.lock().unwrap().push(Box::new(f));
            self.condvar.notify_one();
        }
    }
    

    mpsc::Receiver.recv puts the thread to sleep until an item is put into the channel then the sleeping threads are woken up. similar to the Condvar, the main difference between a channel and Vec is that the channel (queue) is FIFO while Vec when used as a stack will be LIFO.

    you are likely better off using a mpmc::Receiver (and its sender) as it is similar to Arc<Mutex<mpsc::Receiver<Job>>> but more optimized.