rustownership-semantics

Sharing arrays between threads in Rust


I'm new to Rust and I'm struggling with some ownership semantics.

The goal is to do some nonsense measurements on multiplying 2 f64 arrays and writing the result in a third array.

In the single-threaded version, a single thread takes care of the whole range. In the multi-threaded version, each thread takes care of a segment of the range.

The single-threaded version is easy, but my problem is with the multithreaded version where I'm struggling with the ownership rules.

I was thinking to use raw pointers, to bypass the borrow checker. But I'm still not able to make it pass.

#![feature(box_syntax)]

use std::time::SystemTime;
use rand::Rng;
use std::thread;


fn main() {
    let nCells = 1_000_000;
    let concurrency = 1;

    let mut one = box [0f64; 1_000_000];
    let mut two = box [0f64; 1_000_000];
    let mut res = box [0f64; 1_000_000];

    println!("Creating data");
    let mut rng = rand::thread_rng();

    for i in 0..nCells {
        one[i] = rng.gen::<f64>();
        two[i] = rng.gen::<f64>();
        res[i] = 0 as f64;
    }
    println!("Finished creating data");

    let rounds = 100000;
    let start = SystemTime::now();
    let one_raw = Box::into_raw(one);
    let two_raw = Box::into_raw(two);
    let res_raw = Box::into_raw(res);

    let mut handlers = Vec::new();
    for _ in 0..rounds {
        let sizePerJob = nCells / concurrency;
        for j in 0..concurrency {
            let from = j * sizePerJob;
            let to = (j + 1) * sizePerJob;
            handlers.push(thread::spawn(|| {
                unsafe {
                    unsafe {
                        processData(one_raw, two_raw, res_raw, from, to);
                    }
                }
            }));
        }

        for j in 0..concurrency {
            handlers.get_mut(j).unwrap().join();
        }

        handlers.clear();
    }

    let durationUs = SystemTime::now().duration_since(start).unwrap().as_micros();
    let durationPerRound = durationUs / rounds;
    println!("duration per round {} us", durationPerRound);
}

// Make sure we can find the function in the generated Assembly
#[inline(never)]
pub fn processData(one: *const [f64;1000000],
                   two: *const [f64;1000000],
                   res: *mut [f64;1000000],
                   from: usize,
                   to: usize) {
    unsafe {
        for i in from..to {
            (*res)[i] = (*one)[i] * (*two)[i];
        }
    }
}

This is the error I'm getting

error[E0277]: `*mut [f64; 1000000]` cannot be shared between threads safely
   --> src/main.rs:38:27
    |
38  |             handlers.push(thread::spawn(|| {
    |                           ^^^^^^^^^^^^^ `*mut [f64; 1000000]` cannot be shared between threads safely
    |
    = help: the trait `Sync` is not implemented for `*mut [f64; 1000000]`
    = note: required because of the requirements on the impl of `Send` for `&*mut [f64; 1000000]`
note: required because it's used within this closure
   --> src/main.rs:38:41
    |
38  |             handlers.push(thread::spawn(|| {
    |                                         ^^
note: required by a bound in `spawn`
   --> /home/pveentjer/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/std/src/thread/mod.rs:653:8
    |
653 |     F: Send + 'static,
    |        ^^^^ required by this bound in `spawn`

[edit] I know that spawning threads is very expensive. I'll convert this to a pool of worker threads that can be recycled once this code is up and running.


Solution

  • You can use chunks_mut or split_at_mut to get non-overlapping slices of one two and res. You can then access different slices from different threads safely. See: documentation for chunks_mut and documentation for split_at_mut

    I was able to compile it using scoped threads and chunks_mut. I have removed all the unsafe stuff because there is no need. See the code:

    #![feature(box_syntax)]
    #![feature(scoped_threads)]
    
    use rand::Rng;
    use std::thread;
    use std::time::SystemTime;
    
    fn main() {
        let nCells = 1_000_000;
        let concurrency = 2;
    
        let mut one = box [0f64; 1_000_000];
        let mut two = box [0f64; 1_000_000];
        let mut res = box [0f64; 1_000_000];
    
        println!("Creating data");
        let mut rng = rand::thread_rng();
    
        for i in 0..nCells {
            one[i] = rng.gen::<f64>();
            two[i] = rng.gen::<f64>();
            res[i] = 0 as f64;
        }
        println!("Finished creating data");
    
        let rounds = 1000;
        let start = SystemTime::now();
    
        for _ in 0..rounds {
            let size_per_job = nCells / concurrency;
            thread::scope(|s| {
                for it in one
                    .chunks_mut(size_per_job)
                    .zip(two.chunks_mut(size_per_job))
                    .zip(res.chunks_mut(size_per_job))
                {
                    let ((one, two), res) = it;
                    s.spawn(|| {
                        processData(one, two, res);
                    });
                }
            });
        }
    
        let durationUs = SystemTime::now().duration_since(start).unwrap().as_micros();
        let durationPerRound = durationUs / rounds;
        println!("duration per round {} us", durationPerRound);
    }
    
    // Make sure we can find the function in the generated Assembly
    #[inline(never)]
    pub fn processData(one: &[f64], two: &[f64], res: &mut [f64]) {
        for i in 0..one.len() {
            res[i] = one[i] * two[i];
        }
    }