I'm trying to implement a fast version of LZ77 and I have a question to ask you about concurrent programming.
For now I have a final byte[] buffer
and a final int[] resultHolder
, both of the same length. The program does the following:
The main thread writes all the buffer, then notifies the Threads and wait for them to complete.
The single working Thread processes a portion of the buffer saving the results in the same portion of the result holder. Worker's portion is exclusive. After that the main thread is notified and the worker pauses.
When all the workers have paused, the main thread reads the data in resultHolder and updates the buffer, then (if needed) the process begins again from point 1.
Important things in manager (main Thread) are declared as follow:
final byte[] buffer = new byte[SIZE];
final MemoryHelper memoryHelper = new MemoryHelper();
final ArrayBlockingQueue<Object> waitBuffer = new ArrayBlockingQueue<Object>(TOT_WORKERS);
final ArrayBlockingQueue<Object> waitResult = new ArrayBlockingQueue<Object>(TOT_WORKERS);
final int[] resultHolder = new int[SIZE];
MemoryHelper simply wraps a volatile field and provides two methods: one for reading it and one for writing to it.
Worker's run() code:
public void run() {
try {
// Wait main thread
while(manager.waitBuffer.take() != SHUTDOWN){
// Load new buffer values
manager.memoryHelper.readVolatile();
// Do something
for (int i = a; i <= b; i++){
manager.resultHolder[i] = manager.buffer[i] + 10;
}
// Flush new values of resultHolder
manager.memoryHelper.writeVolatile();
// Signal job done
manager.waitResult.add(Object.class);
}
} catch (InterruptedException e) { }
}
Finally, the important part of main Thread:
for(int i=0; i < 100_000; i++){
// Start workers
for (int j = 0; j < TOT_WORKERS; j++)
waitBuffer.add(Object.class);
// Wait workers
for (int j = 0; j < TOT_WORKERS; j++)
waitResult.take();
// Load results
memoryHelper.readVolatile();
// Do something
processResult();
setBuffer();
// Store buffer
memoryHelper.writeVolatile();
}
Synchronization on ArrayBlockingQueue works well. My doubt is in using readVolatile()
and writeVolatile()
. I've been told that writing to a volatile field flushes to memory all the previously changed data, then reading it from another thread makes them visible.
So is it enough in this case to ensure a correct visibility? There is never a real concurrent access to the same memory areas, so a volatile field should be a lot cheaper than a ReadWriteLock.
You don't even need volatile
here, because BlockingQueue
s already provide necessary memory visibility guarantees:
Memory consistency effects: As with other concurrent collections, actions in a thread prior to placing an object into a
BlockingQueue
happen-before actions subsequent to the access or removal of that element from theBlockingQueue
in another thread.
In general, if you already have some kind of synchronization, you probably don't need to do anything special to ensure memory visibility, because it's already guaranteed by synchronization primitives you use.
However, volatile
reads and writes can be used to ensure memory visibility when you don't have explicit synchronization (e.g. in lock-free algorithms).
P. S.
Also it looks like you can use CyclicBarrier
instead of your solution with queues, it's especially designed for similar scenarios.