rustconcurrency

Is crossbeam_epoch Atomic really atomic?


I have a requirement to store a complex struct as config globally. Struct will be mostly read and seldom write. So, i am exploring if i can use crossbeam_epoch::Atomic to store global config, share config across threads. I was testing if read/write to crossbeam_epoch::Atomic is atomic or not.

I created a Config struct with version:i32 property. I created 5 threads and in each threads incremented version of config by 1 for 100 times in a tight loop. I was expecting that at the completion of every thread, i'll get 500 as result. But, results were random.

use crossbeam_epoch as epoch;
use std::sync::atomic::Ordering;
use std::{sync::Arc, thread};

#[derive(Debug)]
struct Config {
    version: i32,
}

fn run() -> () {
    let atomic = Arc::new(epoch::Atomic::new(Config { version: 0 }));

    let mut handles = vec![];

    let per_thread_itr = 100;
    let num_thread = 5;

    for _ in 0..num_thread {
        let thread_handle = thread::spawn({
            let atomic = atomic.clone();
            move || {
                let guard = epoch::pin();
                for _ in 1..per_thread_itr + 1 {
                    let shared = atomic.load(Ordering::SeqCst, &guard);
                    if let Some(v) = unsafe { shared.as_ref() } {
                        atomic.store(
                            epoch::Owned::new(Config {
                                version: v.version + 1,
                            }),
                            Ordering::SeqCst,
                        );
                    }
                }
            }
        });

        handles.push(thread_handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    // Main thread accesses the updated value
    let guard = epoch::pin();
    let result = unsafe { atomic.load(Ordering::SeqCst, &guard).as_ref() }.unwrap();

    println!("Result: {}", result.version);
}

fn main() {
    for _ in 0..5 {
        run();
    }
}

Result:

Result: 297
Result: 398
Result: 349
Result: 235
Result: 365

When using std::sync::atomic::AtomicI32

While the same approach worked for std::sync::atomic::AtomicI32.

use std::sync::atomic::AtomicI32;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::thread;

fn run() {
    // Use AtomicI32 with proper atomic operations
    let atomic = Arc::new(AtomicI32::new(0));

    let mut handles = vec![];

    let per_thread_itr = 100;
    let num_thread = 5;

    for _ in 0..num_thread {
        let a = atomic.clone();
        let thread_handle = thread::spawn(move || {
            for _ in 0..per_thread_itr {
                // Safely increment the atomic value
                a.fetch_add(1, Ordering::SeqCst);
            }
        });

        handles.push(thread_handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result: {}", atomic.load(Ordering::SeqCst));
}

fn main() {
    for _ in 0..5 {
        run();
    }
}

**Result : **

Result: 500
Result: 500
Result: 500
Result: 500
Result: 500

So, i was wondering if crossbeam_epoch::Atomic works differently than std::sync::atomic::AtomicI32 or not? Or am i missing something here?


Solution

  • Atomic<T> is a pointer type, it doesn't magically make access to wrapped T atomic by default. What it does, it allows one to atomically operate on the pointer to heap allocation where T is stored. So basically it's like AtomicUsize, where the value is a pointer to T.

    If you want to modify T in such a way that modification depends on previous state, you have to use CAS loop. Otherwise you will most certainly have a race, as modifications will interleave and result in unexpected states of T.

    Here's an updated version of your code using compare exchange approach. This always produces the result you would expect.

    use crossbeam_epoch as epoch;
    use std::sync::atomic::Ordering;
    use std::{sync::Arc, thread};
    
    #[derive(Debug)]
    struct Config {
        version: i32,
    }
    
    fn run() {
        let atomic = Arc::new(epoch::Atomic::new(Config { version: 0 }));
    
        let mut handles = vec![];
    
        let per_thread_itr = 100;
        let num_thread = 5;
    
        for _ in 0..num_thread {
            let thread_handle = thread::spawn({
                let atomic = atomic.clone();
                move || {
                    let guard = epoch::pin();
                    for _ in 0..per_thread_itr {
                        let mut shared = atomic.load(Ordering::Acquire, &guard);
                        let mut value = unsafe { shared.as_ref() }.unwrap();
    
                        while let Err(state) = atomic.compare_exchange(
                            shared,
                            epoch::Owned::new(Config {
                                version: value.version + 1,
                            }),
                            Ordering::Release,
                            Ordering::Acquire,
                            &guard,
                        ) {
                            shared = state.current;
                            value = unsafe { shared.as_ref() }.unwrap();
                        }
                    }
                }
            });
    
            handles.push(thread_handle);
        }
    
        for handle in handles {
            handle.join().unwrap();
        }
    
        // Main thread accesses the updated value
        let guard = epoch::pin();
        let result = unsafe { atomic.load(Ordering::Acquire, &guard).as_ref() }.unwrap();
    
        println!("Result: {}", result.version);
    }
    
    fn main() {
        for _ in 0..5 {
            run();
        }
    }