I'm trying to gain better understanding of controlling memory order when coding for multiple threads. I've used mutexes a lot in the past to serialize variable access, but I'm trying to avoid those where possible to improve performance.
I have a queue of pointers that may be filled by many threads and consumed by many threads. It works fine with a single thread, but crashes when I run with multiple threads. It looks like the consumers may be getting duplicates of the pointers which causes them to be freed twice. It's a little hard to tell since when I put in any print statements, it runs fine without crashing.
To start with I'm using a pre-allocated vector to hold the pointers. I keep 3 atomic index variables to keep track of what elements in the vector need processing. It may be worth noting that I tried using a _queue type where the elements themselves were atomic by that did not seem to help. Here is the simpler version:
std::atomic<uint32_t> iread;
std::atomic<uint32_t> iwrite;
std::atomic<uint32_t> iend;
std::vector<JEvent*> _queue;
// Write to _queue (may be thread 1,2,3,...)
while(!_done){
uint32_t idx = iwrite.load();
uint32_t inext = (idx+1)%_queue.size();
if( inext == iread.load() ) return kQUEUE_FULL;
if( iwrite.compare_exchange_weak(idx, inext) ){
_queue[idx] = jevent; // jevent is JEvent* passed into this method
while( !_done ){
if( iend.compare_exchange_weak(idx, inext) ) break;
}
break;
}
}
and from the same class
// Read from _queue (may be thread 1,2,3,...)
while(!_done){
uint32_t idx = iread.load();
if(idx == iend.load()) return NULL;
JEvent *Event = _queue[idx];
uint32_t inext = (idx+1)%_queue.size();
if( iread.compare_exchange_weak(idx, inext) ){
_nevents_processed++;
return Event;
}
}
I should emphasize that I am really interested in understanding why this doesn't work. Implementing some other pre-made package would get me past this problem, but would not help me avoid making the same type of mistakes again later.
UPDATE I'm marking Alexandr Konovalov's answer as correct (see my comment in his answer below). In case anyone comes across this page, the corrected code for the "Write" section is:
std::atomic<uint32_t> iread;
std::atomic<uint32_t> iwrite;
std::atomic<uint32_t> iend;
std::vector<JEvent*> _queue;
// Write to _queue (may be thread 1,2,3,...)
while(!_done){
uint32_t idx = iwrite.load();
uint32_t inext = (idx+1)%_queue.size();
if( inext == iread.load() ) return kQUEUE_FULL;
if( iwrite.compare_exchange_weak(idx, inext) ){
_queue[idx] = jevent; // jevent is JEvent* passed into this method
uint32_t save_idx = idx;
while( !_done ){
if( iend.compare_exchange_weak(idx, inext) ) break;
idx = save_idx;
}
break;
}
}
To me, one possible issue can occurs when there are 2 writers and 1 reader. Suppose that 1st writer stops just before
_queue[0] = jevent;
and 2nd writer signals via iend that its _queue[1] is ready to be read. Then, reader via iend sees that _queue[0] is ready to be read, so we have data race.
I recommend you try Relacy Race Detector, that ideally applies to such kind of analysis.