multithreadingrustthread-safety

Value in thread is not updated consistently


I'm trying to write a solution for this problem:

Simulate a simple stock exchange comprising a 4 threads, a stock generator, incrementor, and 2 broadcasters. The generator continually iterates through a stock vector, sending stocks to the incrementor. The incrementor receives the stock and adjusts the value/price of the stock, sending it to one of the 2 broadcasters. The broadcaster prints out the name and current value of the stock.

I'm always stuck at the part where I'm unable to update a stock. The main issue I always get is that the variable containing a mutable reference to stock is dropped too early. I've tried to do signalling with a condvar but my lecturer says that it is more complicated that it needs to be. So, I'm reworking it from the ground up. I understand that my entire concept/model on how to solve the problem is wrong but I am lost in what I'm supposed to actually do.

This is my code so far

fn main() {
    let mut stocks = vec![
        Stock::new("Apple"),
        Stock::new("Microsoft"),
        Stock::new("Tesla"),
        Stock::new("Amazon"),
    ];

    let stocks = Arc::new(Mutex::new(stocks.as_mut_slice()));

    let (stock_tx, stock_rx) = unbounded(); // from crossbeam_channel

    loop {
        stock_generator(stocks.clone(), stock_tx.clone());
        stock_incrementor(stocks_rx.clone());
    }
}

fn stock_generator(stocks: Arc<Mutex<&mut [Stock]>>, stock_tx: CrossSender<&mut Stock>) {
    thread::spawn(move || {
        if let Ok(mut guard) = stocks.lock() {
            for stock in guard.iter_mut() {
                stock_tx.send(stock).unwrap();
            }
        }
    });
}

fn stock_incrementor(stock_rx: CrossReceiver<&mut Stock>) {
    thread::spawn(move || loop {
        let stock = stock_rx.recv().unwrap();
        stock.price += 1;
    });
}

This is the crate I'm using for the unbounded call where I create stock_tx and stock_rx https://docs.rs/crossbeam/latest/crossbeam/channel/index.html

I also tried other things such as this:

fn main() {
    let stocks = vec![
        Stock::new("Apple"),
        Stock::new("Microsoft"),
        Stock::new("Tesla"),
        Stock::new("Amazon"),
    ];

    let stocks = Arc::new(Mutex::new(stocks));

    let (stock_tx, stock_rx) = unbounded();

    stock_generator(stocks.clone(), stock_tx.clone(), stock_rx.clone());
    stock_incrementor(stock_tx.clone(), stock_rx.clone());
    broadcaster(1, stock_rx.clone());
    broadcaster(2, stock_rx.clone());
    loop {}
}

fn stock_generator(
    stocks: Arc<Mutex<Vec<Stock>>>,
    stock_tx: CrossSender<Stock>,
    stock_rx: CrossReceiver<Stock>,
) {
    thread::spawn(move || loop {
        // get updated stock if there is one
        // otherwise use original vec
        if let Ok(updated_stock) = stock_rx.try_recv() {
            let mut guard = stocks.lock().unwrap();
            for stock in guard.iter_mut() {
                if stock.name == updated_stock.name {
                    *stock = updated_stock.clone();
                }
            }
        } // guard is dropped here

        let guard = stocks.lock().unwrap();
        for stock in &*guard {
            // send over to incrementor
            stock_tx.send(stock.clone()).unwrap();
        } // guard is dropped here
    });
}

fn stock_incrementor(stock_tx: CrossSender<Stock>, stock_rx: CrossReceiver<Stock>) {
    thread::spawn(move || loop {
        let mut stock = stock_rx.recv().unwrap();
        stock.price = stock.price.saturating_add(1);
        stock_tx.send(stock).unwrap(); // send updated stock price back to generator and broadcaster
    });
}

fn broadcaster(id: u8, stock_rx: CrossReceiver<Stock>) {
    thread::spawn(move || loop {
        if let Ok(stock) = stock_rx.try_recv() {
            println!("{id}: {} at {}", stock.name, stock.price);
        }
    });
}

I get output which is wrong. Or it's the code that's wrong. I'm not sure because threads don't go in order so I have no idea what's going on right now.

1: Microsoft at 109
2: Apple at 136
1: Microsoft at 109 <--
2: Apple at 136
1: Apple at 136
2: Apple at 137
1: Microsoft at 107 <--
2: Tesla at 239
1: Amazon at 129

So Microsoft was at 109 but at the bottom it appears as 107. As to why this happens I have my guess. So what I have in my head is like this. The first iteration.

  1. try_recv() fails.
  2. guard is obtained
  3. Looping through the Vec inside the guard and each item is sent over to stock_incrementor via a channel.

So here stock_incrementor gest to work. It receives stock updates the price and sends it to the broadcaster via another channel. It also sends it to the stock_generator thread.

  1. guard is dropped when the for loop in stock_generator finishes.

stock_generator's work is done, it can just loop again. So it does. This is iteration two:

  1. try_recv() succeeds (assuming it does)
  2. Repeat.

This second iteration only works properly if the stock_incrementor has finished it's operation of basically += 1. But what happens if it doesn't? It basically gets duplicate data sent from stock_generator.

I edited the code a few more times and it didn't help much. I'm very much lost and I'm not sure exactly I should be correcting if there needs to be any correction. Any help is greatly appreciated.


Solution

  • Your queue sequencing makes no sense: the exercise talks about a sequence of generator -> incrementor -> (broadcast1 | broadcast2), but you're using a single queue for all communication, and the generator pulls back from that queue as well, so they're all competing on that and the entire thing races itself depending on the OS's scheduling decisions: your stocks could cycle multiple times through the incrementor or bounce between the generator and incrementor, or go straight from generator to broadcaster.

    So Microsoft was at 109 but at the bottom it appears as 107. As to why this happens I have my guess.

    There's a trivial way for that to happen: the generator sends it out, the incrementor pulls it, increments it twice (because it increments, sends, then immediately retrieves the same stock), then sends it back out and a broadcaster grabs it and prints it. Since it never went back through the generator that never got the information, so next time it sends that stock it's with the original value, and if that is pulled directly by a broadcaster you get the original value.