rustparallel-processingrayon

Why is rust's rayon taking longer with Arc<Mutex<anyhow::Result<()>>>?


I am trying to parallelize my codes using the crate rayon. The process is to read a file, process it and output the processed file.

I want to take note of the result of the processing of each file such that I have an Arc<Mutex<Vec<anyhow::Result<()>>>> which I lock and push each anyhow::Result<()> resulting from the processing of one file.

fn main() {
    let (mut files, _) = utils::get_files_from_folder(input_folder)?;

    let results = Arc::new(Mutex::new(Vec::<anyhow::Result<()>>::new()));

    files.par_iter_mut().for_each(|path| {
        
        if let Some(extension) = path.extension() {
            if extension == "txt" {
                let result = redact::redact_txt_and_write_json(path, &regex_vec, &output_folder); // processing done here
                results.lock().expect("`results` cannot be locked").push(result); // lock the mutex and push done here
            } else {
                eprintln!(
                    "{}INVALID EXTENSION: {} - Not yet implemented",
                    *RED_ERROR_STRING,
                    extension.to_string_lossy(),
                );
                std::process::exit(1);
            };
            ()
        } else {
            eprintln!("{}EXTENSION not found", *RED_ERROR_STRING);
            std::process::exit(1);
        }
    }); // end of for_each
    println!(
        "{:?}", results.as_ref()
    );
    Ok(())
}

My question is: why is it apparently, that with locking, it takes longer than without locking?

With locking:

Finished dev [unoptimized + debuginfo] target(s) in 1m 34s

Without locking:

Finished dev [unoptimized + debuginfo] target(s) in 0.30s

Solution

  • Your minimal example is quite convoluted and not reproducible, so I rewrote it to demonstrate the problem you are having:

    use std::{
        sync::{Arc, Mutex},
        time::Instant,
    };
    
    use rayon::prelude::{IntoParallelRefIterator, ParallelIterator};
    
    struct SomeResult(u32);
    
    fn do_some_processing(arg: u32) -> SomeResult {
        SomeResult(arg * 2)
    }
    
    fn main() {
        let input_data = (1..10000000).collect::<Vec<_>>();
    
        let t_start = Instant::now();
        let results = Arc::new(Mutex::new(Vec::<SomeResult>::new()));
    
        input_data.par_iter().for_each(|value| {
            let result = do_some_processing(*value);
            results
                .lock()
                .expect("`results` cannot be locked")
                .push(result); // lock the mutex and push done here
        });
        let t_end = Instant::now();
    
        println!("Result item count: {}", results.lock().unwrap().len());
        println!("Time taken: {} ms", (t_end - t_start).as_millis());
    }
    
    Result item count: 9999999
    Time taken: 116 ms
    

    A Mutex is not the optimal data structure here. Locking means that the parallelization you achieved through the use of rayon has to be serialized again at the point of adding the result into the Vec, which becomes a major bottleneck. Only one thread can do push at the same time, all the other threads wait inside the .lock() function for the one thread to finish, until the next thread is allowed. Basically, they are constantly waiting for each other.

    rayon has a solution for this usecase. Its parallel iterators support .collect(), which is the same as putting every item into the vector, just without the lock and highly parallelized.

    With .collect(), I can reduce the time in my example down from 116 ms to 3 ms.

    use std::time::Instant;
    
    use rayon::prelude::{IntoParallelRefIterator, ParallelIterator};
    
    struct SomeResult(u32);
    
    fn do_some_processing(arg: u32) -> SomeResult {
        SomeResult(arg * 2)
    }
    
    fn main() {
        let input_data = (1..10000000).collect::<Vec<_>>();
    
        let t_start = Instant::now();
        let results: Vec<SomeResult> = input_data
            .par_iter()
            .map(|value| {
                let result = do_some_processing(*value);
                result
            })
            .collect();
        let t_end = Instant::now();
    
        println!("Result item count: {}", results.len());
        println!("Time taken: {} ms", (t_end - t_start).as_millis());
    }
    
    Result item count: 9999999
    Time taken: 3 ms