I am now learning about concurrency, and I tried to write a programm which should demonstrate a happens-before relationship when using concurrent collection. As stated in java.concurrent package:
The methods of all classes in java.util.concurrent and its subpackages extend these guarantees to higher-level synchronization. In particular: Actions in a thread prior to placing an object into any concurrent collection happen-before actions subsequent to the access or removal of that element from the collection in another thread.
I wrote the next class:
import java.util.Deque;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicBoolean;
public class HappensBefore {
static int checks = 0;
static int shouldNotHappen = 0;
static Deque<Integer> syncList = new LinkedBlockingDeque<>();
public static boolean varToHappenBefore = false; //this var must be false when new element added to collection
static AtomicBoolean flag = new AtomicBoolean();
static class SyncTask implements Runnable {
int localTemp = -1;
private final Thread t = new Thread(new Counter());
@Override
public void run() {
t.start();
while (syncList.isEmpty()) { //just skip
}
while (true) {
if (!Thread.interrupted()) {
if (flag.get()) {
int r = syncList.peekLast();
if (r != localTemp) {
if (varToHappenBefore) {
shouldNotHappen++;
}
varToHappenBefore = true;
localTemp = r;
checks++;
}
flag.set(false);
}
} else {
t.interrupt();
break;
}
}
}
}
static class Counter implements Runnable {
int ctr = 0;
@Override
public void run() {
while (!Thread.interrupted()) {
if (!flag.get()) {
flag.set(true);
varToHappenBefore = false;
syncList.add(ctr++);
}
}
}
}
public static void main(String[] args) throws InterruptedException {
SyncTask st = new SyncTask();
Thread s = new Thread(st);
s.start();
Thread.sleep(10000);//runtime ms
s.interrupt();
// s1.interrupt();
System.out.println("Elems times added: " + checks);
System.out
.println("Happens-before violated times: " + shouldNotHappen);
}
}
What I make is launching thread1, which lauches thread2. Thread1 checks public boolean varToHappenBefore which is initially set to false. When thread1 saves new elem from the collection, it sets this boolean to true. On the next new elem. recieving if this boolean is still true, happens-before violation has occured, and shouldNotHappen is incremented.
Thread1 checks if concurrent collection has new element, if so, saves it in temp var and increments overall counter checks. Then it toggles atomic boolean for letting thread2 to add new element. Before adding new element, varToHappenBefore is set to false. Because of atomic boolean flag, thread2 doesn't run the code, before thread1 does. But toggling the flag in thread2 is done before adding element and checking varToHappenBefore, because these two operations(elem add and boolean toggle) are synchronized via atomic boolean otherwise. I ensure that thread2 runs only once after thread1 runs. And if varToHappenBefore happens before adding elem. to collection in thread2 which is then read in thread1(thread1 checks varToHappenBefore only if new elem is read from collection), then varToHappenBefore must remain 0 after program ends. But I get next results:
Elems times added: ~10 000 000
Happens-before violated times: 0-10
Probably I make something wrong, multithreading is subtle and complex. Hope for your help.
Edit:
I need to escape the situation when thread1 setFlag(false)
right after it is true and before getting the elem. from collection, because thread2 then can work between getting the element from collections and setting varToHappenBefore = true;
in thread 1. If I make AtomicBoolean.compareAndSet()
checking block, then I get 80k fails per 8mils. And it's predictible and clear. But when I make no such a window for thread2 to add additional elements between reading from collection and setting boolean, I still get few sequential reads, when boolean is true and new element appeared.
Your SyncTask
clears flag
even if it finds list has not been changed yet. So, you may just miss list addition events. This makes your assumptions incorrect:
SyncTask
finds list changed, it assumes Counter
has finished its task, checks(successfull) and rearms varToHappenBefore
. But Counter
actually adds element to the list after that.SyncTask
founds list changed again, it assumes Counter
has finished its task, checks varToHappenBefore
and finds it to be not cleared. This is just because Counter
has not cleared it yet.Just move flag.set(false);
into if (r != localTemp)
block, and everything will work.