multithreadingrustdeque

How to use VecDeque in multi-threaded app?


I am trying to create a multi-threaded app using a VecDeque. I wanted to use it as a shared queue with read-write permissions for all threads. I have the following code:

use std::collections::VecDeque;
use std::{thread, time};

fn main() {
    let mut workload = VecDeque::new();
    workload.push_back(0);

    let mut thread_1_queue = workload.clone();
    let thread_1 = thread::spawn(move || {
        let mut counter1: i32 = 0;
        let some_time = time::Duration::from_millis(50);

        loop {
            counter1 +=1;
            thread_1_queue.push_back(counter1);

            println!("Thread #1: {:?}", thread_1_queue);

            if counter1 == 10 {
                break;
            }

            thread::sleep(some_time);
        };
    });

    let mut thread_2_queue = workload.clone();
    let thread_2 = thread::spawn(move || {
        let mut counter2: i32 = 10;
        let some_time = time::Duration::from_millis(50);

        loop {
            counter2 +=1;
            thread_2_queue.push_back(counter2);

            println!("Thread #2: {:?}", thread_2_queue);

            if counter2 == 20 {
                break;
            }

            thread::sleep(some_time);
        };
    });

    let some_time = time::Duration::from_millis(50);

    loop {
        if workload.capacity() == 10 {
            break;
        }

        println!("MainQueue: {:?}", workload);

        thread::sleep(some_time);
    }

    thread_1.join();
    thread_2.join();
}

Playground link (Beware that it will run endlessly)

My problem is now that the clones in the threads won't update the main queue. Now each thread has his own queue instead of have one shared. As shown here:

Thread #1: [0, 1]
MainQueue: [0]
Thread #2: [0, 11]
Thread #1: [0, 1, 2]
Thread #2: [0, 11, 12]
MainQueue: [0]
MainQueue: [0]
Thread #2: [0, 11, 12, 13]
Thread #1: [0, 1, 2, 3]
MainQueue: [0]
Thread #2: [0, 11, 12, 13, 14]
Thread #1: [0, 1, 2, 3, 4]
MainQueue: [0]
Thread #2: [0, 11, 12, 13, 14, 15]
Thread #1: [0, 1, 2, 3, 4, 5]
MainQueue: [0]
Thread #2: [0, 11, 12, 13, 14, 15, 16]
Thread #1: [0, 1, 2, 3, 4, 5, 6]
MainQueue: [0]
Thread #2: [0, 11, 12, 13, 14, 15, 16, 17]
Thread #1: [0, 1, 2, 3, 4, 5, 6, 7]
MainQueue: [0]
Thread #2: [0, 11, 12, 13, 14, 15, 16, 17, 18]
Thread #1: [0, 1, 2, 3, 4, 5, 6, 7, 8]
MainQueue: [0]
Thread #1: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Thread #2: [0, 11, 12, 13, 14, 15, 16, 17, 18, 19]
Thread #2: [0, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20]
Thread #1: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
MainQueue: [0]

Solution

  • use std::collections::VecDeque;
    use std::sync::{Arc, Mutex};
    use std::{thread, time};
    
    fn main() {
        let workload = Arc::new(Mutex::new(VecDeque::new()));
        workload.lock().unwrap().push_back(0);
    
        let thread_1_queue = workload.clone();
        let thread_1 = thread::spawn(move || {
            let mut counter1: i32 = 0;
            let some_time = time::Duration::from_millis(50);
    
            loop {
                counter1 += 1;
                thread_1_queue.lock().unwrap().push_back(counter1);
    
                println!("Thread #1: {:?}", thread_1_queue.lock().unwrap());
    
                if counter1 == 10 {
                    break;
                }
    
                thread::sleep(some_time);
            }
        });
    
        let thread_2_queue = workload.clone();
        let thread_2 = thread::spawn(move || {
            let mut counter2: i32 = 10;
            let some_time = time::Duration::from_millis(50);
    
            loop {
                counter2 += 1;
                thread_2_queue.lock().unwrap().push_back(counter2);
    
                println!("Thread #2: {:?}", thread_2_queue.lock().unwrap());
    
                if counter2 == 20 {
                    break;
                }
    
                thread::sleep(some_time);
            }
        });
    
        let some_time = time::Duration::from_millis(50);
    
        loop {
            if workload.lock().unwrap().capacity() == 10 {
                break;
            }
    
            println!("MainQueue: {:?}", workload.lock().unwrap());
    
            thread::sleep(some_time);
        }
    
        thread_1.join();
        thread_2.join();
    }
    
    Thread #1: [0, 1]
    MainQueue: [0, 1]
    Thread #2: [0, 1, 11]
    MainQueue: [0, 1, 11]
    Thread #2: [0, 1, 11, 12]
    Thread #1: [0, 1, 11, 12, 2]
    MainQueue: [0, 1, 11, 12, 2]
    Thread #2: [0, 1, 11, 12, 2, 13]
    Thread #1: [0, 1, 11, 12, 2, 13, 3]
    MainQueue: [0, 1, 11, 12, 2, 13, 3]
    Thread #2: [0, 1, 11, 12, 2, 13, 3, 14]
    Thread #1: [0, 1, 11, 12, 2, 13, 3, 14, 4]
    MainQueue: [0, 1, 11, 12, 2, 13, 3, 14, 4]
    Thread #2: [0, 1, 11, 12, 2, 13, 3, 14, 4, 15]
    Thread #1: [0, 1, 11, 12, 2, 13, 3, 14, 4, 15, 5]
    ...
    

    Explanation

    Arc creates a multi-threaded reference counter with which you can share a single object to multiple threads. Note that the content of Arc is always immutable because multiple mutable references to the same object are never allowed in Rust.

    That's why you need a Mutex internally. It creates what is called interior mutability. That means, you can use it to temporarily get mutable access to the object, while it makes sure that the mutable access doesn't collide with other threads.

    Further, this means that when a different thread calls lock() while it is already locked, it will block the other thread. This is what is called a bottleneck and will limit the amount of speedup you will get from your multithreading.

    Further, be aware that between two lock()s, the content of the queue could change. So if it's important that something happens atomically to the queue, you need to keep the queue locked for the entire duration of that action, which further reduces your speedup.

    Further bugs

    Fixed code that doesn't run forever:

    use std::collections::VecDeque;
    use std::sync::{Arc, Mutex};
    use std::{thread, time};
    
    fn main() {
        let workload = Arc::new(Mutex::new(VecDeque::new()));
        workload.lock().unwrap().push_back(0);
    
        let thread_1_queue = workload.clone();
        let thread_1 = thread::spawn(move || {
            let mut counter1: i32 = 0;
            let some_time = time::Duration::from_millis(50);
    
            loop {
                counter1 += 1;
                thread_1_queue.lock().unwrap().push_back(counter1);
    
                println!("Thread #1: {:?}", thread_1_queue.lock().unwrap());
    
                if counter1 == 10 {
                    break;
                }
    
                thread::sleep(some_time);
            }
        });
    
        let thread_2_queue = workload.clone();
        let thread_2 = thread::spawn(move || {
            let mut counter2: i32 = 10;
            let some_time = time::Duration::from_millis(50);
    
            loop {
                counter2 += 1;
                thread_2_queue.lock().unwrap().push_back(counter2);
    
                println!("Thread #2: {:?}", thread_2_queue.lock().unwrap());
    
                if counter2 == 20 {
                    break;
                }
    
                thread::sleep(some_time);
            }
        });
    
        let some_time = time::Duration::from_millis(50);
    
        loop {
            if workload.lock().unwrap().len() >= 10 {
                break;
            }
    
            println!("MainQueue: {:?}", workload.lock().unwrap());
    
            thread::sleep(some_time);
        }
    
        thread_1.join().unwrap();
        thread_2.join().unwrap();
    }
    
    Thread #1: [0, 1]
    Thread #2: [0, 1, 11]
    MainQueue: [0, 1, 11]
    Thread #1: [0, 1, 11, 2]
    Thread #2: [0, 1, 11, 2, 12]
    MainQueue: [0, 1, 11, 2, 12]
    Thread #1: [0, 1, 11, 2, 12, 3]
    MainQueue: [0, 1, 11, 2, 12, 3]
    Thread #2: [0, 1, 11, 2, 12, 3, 13]
    MainQueue: [0, 1, 11, 2, 12, 3, 13]
    Thread #2: [0, 1, 11, 2, 12, 3, 13, 14]
    Thread #1: [0, 1, 11, 2, 12, 3, 13, 14, 4]
    MainQueue: [0, 1, 11, 2, 12, 3, 13, 14, 4]
    Thread #2: [0, 1, 11, 2, 12, 3, 13, 14, 4, 15]
    Thread #1: [0, 1, 11, 2, 12, 3, 13, 14, 4, 15, 5]
    Thread #2: [0, 1, 11, 2, 12, 3, 13, 14, 4, 15, 5, 16]
    Thread #1: [0, 1, 11, 2, 12, 3, 13, 14, 4, 15, 5, 16, 6]
    Thread #2: [0, 1, 11, 2, 12, 3, 13, 14, 4, 15, 5, 16, 6, 17]
    Thread #1: [0, 1, 11, 2, 12, 3, 13, 14, 4, 15, 5, 16, 6, 17, 7]
    Thread #2: [0, 1, 11, 2, 12, 3, 13, 14, 4, 15, 5, 16, 6, 17, 7, 18]
    Thread #1: [0, 1, 11, 2, 12, 3, 13, 14, 4, 15, 5, 16, 6, 17, 7, 18, 8]
    Thread #2: [0, 1, 11, 2, 12, 3, 13, 14, 4, 15, 5, 16, 6, 17, 7, 18, 8, 19]
    Thread #1: [0, 1, 11, 2, 12, 3, 13, 14, 4, 15, 5, 16, 6, 17, 7, 18, 8, 19, 9]
    Thread #2: [0, 1, 11, 2, 12, 3, 13, 14, 4, 15, 5, 16, 6, 17, 7, 18, 8, 19, 9, 20]
    Thread #1: [0, 1, 11, 2, 12, 3, 13, 14, 4, 15, 5, 16, 6, 17, 7, 18, 8, 19, 9, 20, 10]