multithreadingrustrayon

Per-thread initialization in Rayon


I am trying to optimize my function using Rayon's par_iter().

The single threaded version is something like:

fn verify_and_store(store: &mut Store, txs: Vec<Tx>) {

    let result = txs.iter().map(|tx| {

         tx.verify_and_store(store)

    }).collect();

    ...
}

Each Store instance must be used only by one thread, but multiple instances of Store can be used concurrently, so I can make this multithreaded by clone-ing store:

fn verify_and_store(store: &mut Store, txs: Vec<Tx>) {

    let result = txs.par_iter().map(|tx| {

         let mut local_store = store.clone();

         tx.verify_and_store(&mut local_store)

    }).collect();

    ...
}

However, this clones the store on every iteration, which is way too slow. I would like to use one store instance per thread.

Is this possible with Rayon? Or should I resort to manual threading and a work-queue?


Solution

  • For a proper solution to this question, please refer to Mike's answer. Remaining text is still useful for historical purposes, and as an example of use of thread-locals.


    It is possible to use a thread-local variable to ensure that local_store is not created more than once in a given thread.

    For example, this compiles:

    fn verify_and_store(store: &mut Store, txs: Vec<Tx>) {
        thread_local! {
            static STORE: RefCell<Option<Store>> = RefCell::new(None);
        }
    
        let _result: Vec<_> = txs
            .par_iter()
            .map(|tx| {
                STORE.with(|cell| {
                    let mut local_store = cell.borrow_mut();
                    if local_store.is_none() {
                        *local_store = Some(store.clone());
                    }
                    tx.verify_and_store(local_store.as_mut().unwrap())
                })
            })
            .collect();
    }
    

    Playground

    There are two problems with this code, however. One, if the clones of store need to do something when par_iter() is done, such as flush their buffers, it simply won't happen - their Drop will only be called when Rayon's worker threads exit, and even that is not guaranteed.

    The second, and more serious problem, is that the clones of store are created exactly once per worker thread. Since Rayon uses a global thread pool, this means that an unrelated later call to verify_and_store will continue working with last known clones of store, which possibly have nothing to do with the current store.

    This can be rectified by complicating the code somewhat:

    The result is not particularly pretty, but it compiles, uses only safe code, and appears to work:

    fn verify_and_store(store: &mut Store, txs: Vec<Tx>) {
        type SharedStore = Arc<Mutex<Option<Store>>>;
    
        static STORE_CLONES: LazyLock<Mutex<Vec<SharedStore>>> = LazyLock::new(Default::default);
        static NO_REENTRY: LazyLock<Mutex<()>> = LazyLock::new(Default::default);
    
        thread_local! {
            static STORE: SharedStore = SharedStore::default();
        }
    
        let _no_reentry = NO_REENTRY.lock();
    
        let _result: Vec<_> = txs
            .par_iter()
            .map(|tx| {
                STORE.with(|arc_mtx| {
                    let mut local_store = arc_mtx.lock().unwrap();
                    if local_store.is_none() {
                        *local_store = Some(store.clone());
                        STORE_CLONES.lock().unwrap().push(arc_mtx.clone());
                    }
                    tx.verify_and_store(local_store.as_mut().unwrap())
                })
            })
            .collect();
    
        let mut store_clones = STORE_CLONES.lock().unwrap();
        for store in store_clones.drain(..) {
            store.lock().unwrap().take();
        }
    }
    

    Playground