I am trying to parallelize my codes using the crate rayon
. The process is to read a file, process it and output the processed file.
I want to take note of the result of the processing of each file such that I have an Arc<Mutex<Vec<anyhow::Result<()>>>>
which I lock and push each anyhow::Result<()>
resulting from the processing of one file.
fn main() {
let (mut files, _) = utils::get_files_from_folder(input_folder)?;
let results = Arc::new(Mutex::new(Vec::<anyhow::Result<()>>::new()));
files.par_iter_mut().for_each(|path| {
if let Some(extension) = path.extension() {
if extension == "txt" {
let result = redact::redact_txt_and_write_json(path, ®ex_vec, &output_folder); // processing done here
results.lock().expect("`results` cannot be locked").push(result); // lock the mutex and push done here
} else {
eprintln!(
"{}INVALID EXTENSION: {} - Not yet implemented",
*RED_ERROR_STRING,
extension.to_string_lossy(),
);
std::process::exit(1);
};
()
} else {
eprintln!("{}EXTENSION not found", *RED_ERROR_STRING);
std::process::exit(1);
}
}); // end of for_each
println!(
"{:?}", results.as_ref()
);
Ok(())
}
My question is: why is it apparently, that with locking, it takes longer than without locking?
With locking:
Finished dev [unoptimized + debuginfo] target(s) in 1m 34s
Without locking:
Finished dev [unoptimized + debuginfo] target(s) in 0.30s
Your minimal example is quite convoluted and not reproducible, so I rewrote it to demonstrate the problem you are having:
use std::{
sync::{Arc, Mutex},
time::Instant,
};
use rayon::prelude::{IntoParallelRefIterator, ParallelIterator};
struct SomeResult(u32);
fn do_some_processing(arg: u32) -> SomeResult {
SomeResult(arg * 2)
}
fn main() {
let input_data = (1..10000000).collect::<Vec<_>>();
let t_start = Instant::now();
let results = Arc::new(Mutex::new(Vec::<SomeResult>::new()));
input_data.par_iter().for_each(|value| {
let result = do_some_processing(*value);
results
.lock()
.expect("`results` cannot be locked")
.push(result); // lock the mutex and push done here
});
let t_end = Instant::now();
println!("Result item count: {}", results.lock().unwrap().len());
println!("Time taken: {} ms", (t_end - t_start).as_millis());
}
Result item count: 9999999
Time taken: 116 ms
A Mutex
is not the optimal data structure here. Locking means that the parallelization you achieved through the use of rayon
has to be serialized again at the point of adding the result into the Vec
, which becomes a major bottleneck. Only one thread can do push
at the same time, all the other threads wait inside the .lock()
function for the one thread to finish, until the next thread is allowed. Basically, they are constantly waiting for each other.
rayon
has a solution for this usecase. Its parallel iterators support .collect()
, which is the same as putting every item into the vector, just without the lock and highly parallelized.
With .collect()
, I can reduce the time in my example down from 116 ms
to 3 ms
.
use std::time::Instant;
use rayon::prelude::{IntoParallelRefIterator, ParallelIterator};
struct SomeResult(u32);
fn do_some_processing(arg: u32) -> SomeResult {
SomeResult(arg * 2)
}
fn main() {
let input_data = (1..10000000).collect::<Vec<_>>();
let t_start = Instant::now();
let results: Vec<SomeResult> = input_data
.par_iter()
.map(|value| {
let result = do_some_processing(*value);
result
})
.collect();
let t_end = Instant::now();
println!("Result item count: {}", results.len());
println!("Time taken: {} ms", (t_end - t_start).as_millis());
}
Result item count: 9999999
Time taken: 3 ms