javamultithreadingreentrantlockextrinsic-parameters

ReentrantLock threads terminating randomly


I have been working on a school assignment which is about multithreading in Java. One of the tasks that I am stuck on is that we need to create multiple threads in different groups, and once there are 4 threads in each group, only then they can be released to work in unison, otherwise they have to be put on hold/waiting. For example:

That is the general task which I'm done with. The task I am working on requires us to release the INITIAL first 4 threads of a group, and the rest should wait until 4 of the previous threads have called finished().

For example, 3 threads join group 65, they are put on wait. Another thread joins group 65 and all 4 threads are released together. Now 4 threads are working (terminated). Now thread e,f,g,h,i,j,k,l join group 65. All of them are put to wait until e,f,g,h have called finished() method.

Here is what I have done so far:

ExtrinsicSync.java:

import java.util.HashMap;
import java.util.concurrent.locks.ReentrantLock;

public class ExtrinsicSync {

    private HashMap<Integer, ConditionWrapper> groupThreadCount;
    private ReentrantLock monitor;
    private int count = 0;

    ExtrinsicSync() {
        groupThreadCount = new HashMap<>();
        monitor = new ReentrantLock();
    }

@Override
public void waitForThreadsInGroup(int groupId) {
    monitor.lock();

    if (!groupThreadCount.containsKey(groupId))
        groupThreadCount.put(groupId, new ConditionWrapper(monitor.newCondition()));

    ConditionWrapper condWrapper = groupThreadCount.get(groupId);
    condWrapper.setValue(condWrapper.getValue() + 1);

    if(condWrapper.getValue() == 4 && condWrapper.getInitialStatus())
    {
        condWrapper.getCondition().signalAll();
        condWrapper.setInitialStatus(false);

        System.out.println("Terminating group: " + groupId + "FROM INITIAL STATE: " + ++count);
    } else {
        System.out.println("Putting thread from group: " + groupId + " on wait: " + ++waitcount);
        try { condWrapper.getCondition().await(); }
        catch (InterruptedException e) { e.printStackTrace(); }

    }

    monitor.unlock();
}

@Override
public void finished(int groupId) {
    monitor.lock();
    ConditionWrapper condWrapper = groupThreadCount.get(groupId);

    if(!condWrapper.getInitialStatus())
    {
        condWrapper.setFinishedCount(condWrapper.getFinishedCount() + 1);
        System.out.println("Group: " + groupId + "FINISHED COUNT: " + condWrapper.getFinishedCount());
        if(condWrapper.getFinishedCount() == 4)
        {
            condWrapper.setFinishedCount(0);
            condWrapper.getCondition().signalAll();
            System.out.println("Terminating threads for group: " + groupId + ": " + ++count);
        }
    }
    monitor.unlock();
}

ExtrinsicSyncTest.java:

import org.junit.Test;

import java.util.EnumMap;

class TestTask1 implements Runnable{

    final int group;
    final ExtrinsicSync s1;

    TestTask1(int group, ExtrinsicSync s1)
    {
        this.group = group;
        this.s1 = s1;
    }

    public void run() { s1.waitForThreadsInGroup(group); s1.finished(group); }
}

public class ExtrinsicSyncTest {

    @Test
    public void testPhaseThreethreads() {

        int nThreads = 22;

        Thread t[] = new Thread[nThreads];
        final ExtrinsicSync s1 = new ExtrinsicSync();

        for(int i = 0; i < nThreads/2; i++)
            (t[i] = new Thread(new TestTask1(66, s1))).start();

        for(int i = nThreads/2; i < nThreads; i++)
            (t[i] = new Thread(new TestTask1(70, s1))).start();

        for (Thread ti : t)
        {
            try { ti.join(100); }
            catch (Exception e) { System.out.println(e); }
        }

        EnumMap<Thread.State, Integer> threadsInThisState = new EnumMap<>(Thread.State.class);

        for (Thread.State s : Thread.State.values())
            threadsInThisState.put(s, 0);

        for (Thread ti : t)
        {
            Thread.State state = ti.getState();
            int n = threadsInThisState.get(state);
            threadsInThisState.put(state, n + 1);
        }

        System.out.println("threadsInThisState: " + threadsInThisState.toString() );

    }
}

ConditionWrapper.java:

import java.util.concurrent.locks.Condition;

public class ConditionWrapper {
    private Condition cond;
    private Integer value;
    private Integer finishedCount;
    private boolean initialThreads;

    public ConditionWrapper(Condition condition)
    {
        this.cond = condition;
        this.value = 0;
        this.finishedCount = 0;
        this.initialThreads = true;
    }
    // Returns the condition object of current request
    public Condition getCondition()
    {
        return this.cond;
    }
    // Gets the current counter of threads waiting in this queue.
    public Integer getValue()
    {
        return this.value;
    }
    // Sets the given value. Used for resetting the counter.
    public void setValue(int value) { this.value = value; }
    // Sets the counter to help keep track of threads which called finished() method
    public void setFinishedCount(int count) { this.finishedCount = count; }
    // Gets the finished count.
    public Integer getFinishedCount() { return this.finishedCount; }
    // This flag is to identify initial threads of a group
    public boolean getInitialStatus() { return initialThreads; }
    public void setInitialStatus(boolean val) { this.initialThreads = val; }
}

The problem I am having is that I am able to release the first four threads of every group, but somehow, somewhere 2 threads are being terminated randomly and I cannot figure out what is going on. For example, with 22 threads test case above divided into two groups, only 8 threads should be terminated while the rest of them wait.

But here 10 threads are being terminated instead. I do not understand what is going on. I have stripped the code down to bare minimum as best as I could.


Solution

  • The problem is that for the not initial threads (getInitialStatus==false) you do not signal the other threads but you still terminate them when you reached four of them. So this is what happens:

    1. first three threads increase the count and wait
    2. the fourth thread reaches count == 4 and sets initial = false and signals all the other threads and sets the count to zero
    3. the next three threads increase the count by one
    4. the 8 threads reaches count == 4 and gets terminated. Since getInitialStatus==false this thread does not notify the other threads.

    so 4*2 threads + 2 threads get terminated. Exactly the count you have seen in your tests.


    Here is a potential way to implement this:

    1. use a flag canExecute in each thread or task
    2. use a method calculateState to calculate the current state and set the flag to true if a thread is allowed to execute.
    3. store all threads which are waiting in a list or something similar

    So your task would look like this:

    Task
      boolean canExeute
    

    The method waitForThreadsInGroup then lookslike this:

    waitForThreadsInGroup
      monitor.lock();
          add task to list
          calculateTaskState
          condition.notifyAll
          while( ! task.canExcecute )
          {
            condition.await.
          }
    
      monitor.unlock();
    

    The finish method looks similar:

      finish
        monitor.lock();
        decrement finish count
        calculateTaskState
       condition.notifyAll
       monitor.unlock();
    

    And calculateTaskState

    calculateTaskState
      if( finishCount == 0)
      {
          if( taskList.size >= 4  )
          {
             set 4 tasks in this list to can execute and remove them from the list
          }
      }
    

    So the trick is to separate the logic into three steps:

    1. the action, for example reducing the finish count
    2. the calculation of the new state. And deciding for each thread if it is allowed to execute
    3. And the waiting of the threads. Each thread needs to wait on its own flag