I have a requirement to store a complex struct as config globally. Struct will be mostly read and seldom write. So, i am exploring if i can use crossbeam_epoch::Atomic
to store global config, share config across threads. I was testing if read/write to crossbeam_epoch::Atomic
is atomic or not.
I created a Config struct with version:i32
property. I created 5 threads and in each threads incremented version of config by 1 for 100 times in a tight loop. I was expecting that at the completion of every thread, i'll get 500 as result. But, results were random.
use crossbeam_epoch as epoch;
use std::sync::atomic::Ordering;
use std::{sync::Arc, thread};
#[derive(Debug)]
struct Config {
version: i32,
}
fn run() -> () {
let atomic = Arc::new(epoch::Atomic::new(Config { version: 0 }));
let mut handles = vec![];
let per_thread_itr = 100;
let num_thread = 5;
for _ in 0..num_thread {
let thread_handle = thread::spawn({
let atomic = atomic.clone();
move || {
let guard = epoch::pin();
for _ in 1..per_thread_itr + 1 {
let shared = atomic.load(Ordering::SeqCst, &guard);
if let Some(v) = unsafe { shared.as_ref() } {
atomic.store(
epoch::Owned::new(Config {
version: v.version + 1,
}),
Ordering::SeqCst,
);
}
}
}
});
handles.push(thread_handle);
}
for handle in handles {
handle.join().unwrap();
}
// Main thread accesses the updated value
let guard = epoch::pin();
let result = unsafe { atomic.load(Ordering::SeqCst, &guard).as_ref() }.unwrap();
println!("Result: {}", result.version);
}
fn main() {
for _ in 0..5 {
run();
}
}
Result:
Result: 297
Result: 398
Result: 349
Result: 235
Result: 365
std::sync::atomic::AtomicI32
While the same approach worked for std::sync::atomic::AtomicI32
.
use std::sync::atomic::AtomicI32;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::thread;
fn run() {
// Use AtomicI32 with proper atomic operations
let atomic = Arc::new(AtomicI32::new(0));
let mut handles = vec![];
let per_thread_itr = 100;
let num_thread = 5;
for _ in 0..num_thread {
let a = atomic.clone();
let thread_handle = thread::spawn(move || {
for _ in 0..per_thread_itr {
// Safely increment the atomic value
a.fetch_add(1, Ordering::SeqCst);
}
});
handles.push(thread_handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Result: {}", atomic.load(Ordering::SeqCst));
}
fn main() {
for _ in 0..5 {
run();
}
}
**Result : **
Result: 500
Result: 500
Result: 500
Result: 500
Result: 500
So, i was wondering if crossbeam_epoch::Atomic
works differently than std::sync::atomic::AtomicI32
or not? Or am i missing something here?
Atomic<T>
is a pointer type, it doesn't magically make access to wrapped T atomic by default. What it does, it allows one to atomically operate on the pointer to heap allocation where T is stored. So basically it's like AtomicUsize
, where the value is a pointer to T.
If you want to modify T in such a way that modification depends on previous state, you have to use CAS loop. Otherwise you will most certainly have a race, as modifications will interleave and result in unexpected states of T.
Here's an updated version of your code using compare exchange approach. This always produces the result you would expect.
use crossbeam_epoch as epoch;
use std::sync::atomic::Ordering;
use std::{sync::Arc, thread};
#[derive(Debug)]
struct Config {
version: i32,
}
fn run() {
let atomic = Arc::new(epoch::Atomic::new(Config { version: 0 }));
let mut handles = vec![];
let per_thread_itr = 100;
let num_thread = 5;
for _ in 0..num_thread {
let thread_handle = thread::spawn({
let atomic = atomic.clone();
move || {
let guard = epoch::pin();
for _ in 0..per_thread_itr {
let mut shared = atomic.load(Ordering::Acquire, &guard);
let mut value = unsafe { shared.as_ref() }.unwrap();
while let Err(state) = atomic.compare_exchange(
shared,
epoch::Owned::new(Config {
version: value.version + 1,
}),
Ordering::Release,
Ordering::Acquire,
&guard,
) {
shared = state.current;
value = unsafe { shared.as_ref() }.unwrap();
}
}
}
});
handles.push(thread_handle);
}
for handle in handles {
handle.join().unwrap();
}
// Main thread accesses the updated value
let guard = epoch::pin();
let result = unsafe { atomic.load(Ordering::Acquire, &guard).as_ref() }.unwrap();
println!("Result: {}", result.version);
}
fn main() {
for _ in 0..5 {
run();
}
}