multithreadingrustconcurrencymutexthread-synchronization

How to run synchronous code in Rust (one thread at a time)?


I'm building a Rest API in Rust with warp web framework and tokio async runtime, the problem is that there is a sequence of operations in my code in which the threads cannot execute them all at once, they need to execute one at a time, how do I do it in Rust?

This is the block of code that needs to be Locked:

async fn execute() {
    client_dao.get_client(id);
    let new_balance = calculate_new_balance();
    client_dao.transaction(id, new_balance, transaction_entity);
}

my attempts:

use std::sync::Mutex

async fn execute() {
    let mutex = Mutex::new(0);
    let _guard = mutex.lock().unwrap();

    client_dao.get_client(id);
    let new_balance = calculate_new_balance();
    client_dao.transaction(id, new_balance, transaction_entity);
}

this resulted in me getting an error:

async function: the trait `std::marker::Send` is not implemented for `std::sync::MutexGuard<'_, Client>`

and in this link https://stackoverflow.com/a/67277503/12253990 I saw that by using an async runtime, I needed to use a mutex made for async, so I tried this:

use tokio::sync::Mutex;

async fn execute() {
    let mutex = Mutex::new(0);
    let _guard = mutex.lock().await;

    client_dao.get_client(id);
    let new_balance = calculate_new_balance();
    client_dao.transaction(id, new_balance, transaction_entity);
}

it didn't give me an error, but the mutex didn't work, there was a race condition bug... I made a script using k6 that calls the endpoint twice at the same time passing value 50, client with balance 0, that is, the final balance it should be 100, but the final balance is 50 because the threads run over each other...

Another thing I tried:

use tokio::sync::Mutex;
use std::sync::Arc;

async fn execute() {
    let mutex = Arc::new(Mutex::new(0));
    let _guard = mutex.lock().await;

    client_dao.get_client(id);
    let new_balance = calculate_new_balance();
    client_dao.transaction(id, new_balance, transaction_entity);
}

and the same thing, there is no error but the race condition continues to happen, the threads are not executing one at a time, but how do I do it in Rust?


Solution

  • I solved the problem using stored functions, I left the processing and the business rule in the database because I can block the record selected for updating by other transactions until the current transaction is completed, that is, only one transaction at a time can modify the selected data, this avoids race conditions.

    Another thing that could work is this code:

    use tokio::sync::Mutex;
    use std::sync::Arc;
    
    let mutex = Arc::new(Mutex::new(0));
    
    // Start an asynchronous task
    tokio::spawn(async move {
        let mut data = mutex.lock().await;
    
        // Perform all operations while the Mutex is locked
        client_dao.get_client(id).await;
        let new_balance = calculate_new_balance();
        client_dao.transaction(id, new_balance, transaction_entity).await;
    
        // Modify shared state
        *data += 1;
    });
    

    A dev from the rust community sent it to me but I didn't get to test it because I had already solved the problem with stored functions