In my current application I am receiving spectral data through a spectrometer. This data is accumulated for one second and then put into a circular buffer. For now I have one consumer, who pops entries from the buffer and then saves everything to disk. Ok all of that stuff works. Now what I need to do is add another consumer, who, in parallel to the saving, does some processing with the spectra. So I have two consumers needing the exact same data (note: they only read and don't modify). Ok but this doesn't work because if one of the consumers pops one entry of the buffer it is gone, so the other would not receive it. I guess the simplest solution to this problem is to give every consumer it's own circular buffer. Fine, but the only problem is: the data entries are big. One entry has a maximum size of around 80MB, so in order to save memory it would be great to not have the same data there twice. Is there any better solution?
Note: I am using a circular buffer so it is ensured that the buffer has a growing limit.
I should hope you're receiving your data directly into the queue and not copying it around much....
Any valid solution that would keep a single copy of the data would have to sync all the consumers so that only when they're all done with an entry it can be popped.
You can keep your circular buffer. You only need a single remover to remove an entry when the readers are done with it. I strongly suggest this remover to be the writer of the data. This way it'd be the only guy with write access to the queue, and that simplifies things.
The remover can be fed from the consumers telling it what are they done with.
Consumers can share their read offsets with the remover. You can use atomic_store on the consumer side, and atomic_load on the remover side.
It should be something like that:
struct Consumer {
...
long offset = 0;
...
Consumer() {
q.remover->add(this);
}
...
void run() {
for(;;) {
entry& e = q.read( offset );
process( e );
atomic_store( &offest, offset + e.size() );
}
}
};
struct Remover {
...
long remove_offset = 0;
std::list<Consumer*> cons;
...
void remove() {
// find lowest read point
long cons_offset = MAX_LONG;
for( auto p : cons ) {
cons_offset = std::min( cons_offset, atomic_load(&p->offset) );
}
// remove up to that point
while( cons_offset > remove_offset ) {
entry& e = q.read(remove_offset);
remove_offset += e.size();
q.remove( e.size() );
}
}
};