javaconcurrencyconditional-statementsmutual-exclusionreentrantreadwritelock

Thread is not waiting for new data using Condition


I'm trying to program something for a lab that basically is a web cache that stores data in a folder so if a client wants to open a webpage, that page will be stored in the cache folder if it is not already there, and it will be showed to the client. I'm able to download and show pages, but the problem comes when a thread has to wait because the resource asked is being downloaded and stored in the cache folder.

The downloads class is the following one:

public class Downloads {
private ArrayList<DownloadsNode> nodes;
private int debug,readersCount=0,writersCount=0;
private Lock readLock=new ReentrantReadWriteLock().readLock();
private Lock writeLock=new ReentrantReadWriteLock().writeLock();    

public Downloads ( int debug )
{   nodes = new ArrayList<DownloadsNode>();
    this.debug = debug;
}


public synchronized DownloadsNode findNode ( URL url )
{   Iterator<DownloadsNode> i = nodes.iterator();
    while ( i.hasNext() )
    {   DownloadsNode node = i.next();
        if (node.getUrl().equals(url))
            return node;
    }
    return null;
}

public synchronized DownloadsNode addNode ( URL url )
{   DownloadsNode node = new DownloadsNode ( url );
    nodes.add ( node );
    return node;
}

public synchronized void deleteNode ( DownloadsNode node )
{   nodes.remove ( node );
}

public synchronized void accessAsAReader()
{
    while(writersCount>0)
    {
        try 
        {
            this.wait();
        } 
        catch (InterruptedException e)
        {
            e.printStackTrace();
        }

    }
    readLock.lock();
    readersCount++;
}

public synchronized void accessAsAWriter()
{
    while(readersCount>0||writersCount>0)
    {
        try 
        {
            this.wait();
        } 
        catch (InterruptedException e) 
        {
            e.printStackTrace();
        }
    }
    writeLock.lock();
    writersCount++;
}

public synchronized void outAsAReader()
{
    readLock.unlock();
    readersCount--;
    this.notifyAll();
}

public synchronized void outAsAWriter()
{
    writeLock.unlock();
    writersCount--;
    this.notifyAll();
}

public Lock getreadLock()
{
    return this.readLock;
}
}

The class DownloadsNode is the following:

public class DownloadsNode {
private int waiting;
private URL url;
private cacheNode node;
private Lock readLock=new ReentrantReadWriteLock().readLock();
private Lock writeLock=new ReentrantReadWriteLock().writeLock();    
private int readersCount=0,writersCount=0;

public DownloadsNode ( URL url )
{   waiting = 1;
    this.url = url;
}

public URL getUrl()
{   return url;
}

public synchronized void setnodeC ( cacheNode node )
{   this.node = node;
}

public synchronized cacheNode getnodeC()
{   return node;
}

public synchronized int getwaiting()
{   return waiting;
}

public synchronized void addWaiting()
{   waiting++;
}


public synchronized void quitWaiting()
{   waiting--;
}

    public synchronized void accessAsAReader()
{
    while(writersCount>0)
    {
        try 
        {
            this.wait();
        } 
        catch (InterruptedException e)
        {
            e.printStackTrace();
        }

    }
    readLock.lock();
    readersCount++;
}

public synchronized void accessAsAWriter()
{
    while(readersCount>0||writersCount>0)
    {
        try 
        {
            this.wait();
        } 
        catch (InterruptedException e) 
        {
            e.printStackTrace();
        }
    }
    writeLock.lock();
    writersCount++;
}

public synchronized void outAsAReader()
{
    readLock.unlock();
    readersCount--;
    this.notifyAll();
}

public synchronized void outAsAWriter()
{
    writeLock.unlock();
    writersCount--;
    this.notifyAll();
}

public Lock getreadLock()
{
    return this.readLock;
}
}

The cache class is the following:

public class Cache {
private HashMap<URL,cacheNode> map; 
private Lock readLock=new ReentrantReadWriteLock().readLock();
private Lock writeLock=new ReentrantReadWriteLock().writeLock();    
private boolean added=false;

public Cache( int up_edge, int lim_inf, int debug )
{   
       //The content of this doesn't matter
}

public synchronized cacheNode findNode ( URL url )
{   
    cacheNode cacheNode= map.get ( url );
    return cacheNode;
}


public synchronized cacheNode addNode ( resourceWeb resource, AtomicInteger referencias )
{   

    cacheNode node = new cacheNode ( resource, referencias );
    map.put ( resource.getUrl(), node );
    added=true;
    return node;
}

public boolean getadded()
{
    return added;
}

    public synchronized void accessAsAReader()
{
    while(writersCount>0)
    {
        try 
        {
            this.wait();
        } 
        catch (InterruptedException e)
        {
            e.printStackTrace();
        }

    }
    readLock.lock();
    readersCount++;
}

public synchronized void accessAsAWriter()
{
    while(readersCount>0||writersCount>0)
    {
        try 
        {
            this.wait();
        } 
        catch (InterruptedException e) 
        {
            e.printStackTrace();
        }
    }
    writeLock.lock();
    writersCount++;
}

public synchronized void outAsAReader()
{
    readLock.unlock();
    readersCount--;
    this.notifyAll();
}

public synchronized void outAsAWriter()
{
    writeLock.unlock();
    writersCount--;
    this.notifyAll();
}

public Lock getreadLock()
{
    return this.readLock;
}
}

All of the previous classes are used from the following one:

public class MainThread extends Thread{
private ArrayBlockingQueue<Petition> petitionQueue;
private Cache cache;
private int debug;

//constructor goes here but its content doesn't matter

public void run()
{
    webResource webResource = null;
    while(true)
    {
        try {
            CacheNode cNode;
            do
            {

                Petition petition = PetitionQueue.take();


                //Searchs the web resource inside the cache
                cache.accessAsAReader();
                synchronized(this)
                {
                    cNode = cache.findNode ( petition.getURL() );
                }
                if ( cNode != null )    // If it's found
                {   
                    cache.outAsAReader();
                    webResource = cNode.webResource;
                }                   

                else // if it's not found
                {
                    cache.outAsAReader();

                    Downloads downloads=new Downloads(debug);
                    DownloadsNode node;
                    downloads.accessAsAReader();
                    synchronized(this)
                    {
                        node=downloads.findNode(petition.getURL());
                    }

                    **if(node!=null)
                    {
                        downloads.outAsAReader();                           
                        /*Another thread is downloading to store in the cache*/
                        /*We indicate that we are waiting. Here is my problem if I'm not wrong*/
                        node.accessAsAWriter();                     
                        synchronized(this)
                        {
                            node.incrementWaiting();
                        }
                        Condition waitingCondition=node.getReadLock().newCondition();
                        while(cache!=null&&!cache.getAdded())
                        {
                            waitingCondition.await();   
                        }
                        /*Delete from count of waiting threads*/

                        synchronized(this)
                        {
                            nodo.decrementWaiting();
                        }
                        node.outAsAWriter();
                        /*If there aren't wating threads, delete the entry from downloads list*/
                        if(node.getWaiting()==0)
                        {
                            downloads.accessAsAWriter();
                            synchronized(this)
                            {
                                downloads.delete(node);
                            }
                            downloads.outAsAWriter();
                        }
                    }
                    else
                    {
                        downloads.outAsAReader();
                        downloads.accessAsAWriter();
                        synchronized(this)
                        {
                            node=downloads.addNode(petition.getURL());
                        }
                        downloads.outAsAWriter();
                        // Download the web resource.
                        webResource = petition.download();


                        cache.accessAsAWriter();
                        synchronized(this)
                        {
                            cNode = cache.addNode ( webResource, new AtomicInteger(1) );

                        }
                        cache.outAsAWriter();
                        /*Check if other threads are waiting*/
                        if(node.getWaiting()>0)
                        {
                            Condition condition2=nodo.getWriteLock().newCondition();
                            condition2.signalAll();
                        }
                        else
                        {
                            downloads.accessAsAWriter();
                            synchronized(this)
                            {
                                downloads.delete(node);
                            }
                            downloads.outAsAWriter();
                        }
                    }

                }
                //Sends response to the client
            } while (true);

        } catch (InterruptedException e) {
             // TODO: poner una forma de terminar.
        } catch (Exception e) {

        }
    }
}

}

There are a few things to take into account:

-I have to create condition variables in every node so threads will wait because of that and implement methods to open/close locks and guarantee mutual exclusion.

-If one thread is downloading a resource that other threads are interested in, I used a Condition variable to wait and to say "stop waiting" when the resource was downloaded and stored in the cache. What am I doing wrong in that part? (all of it is in MainThread class)

-When guaranteeing mutual exclusion, do I have to put synchronized in the proper methods, do the following way: -Lock -synchronized(this) and do whatever I have to -Unlock or am I doing the same in both ways? I have to stablish critic regions when managing the download list and the cache, so I think that there are parts where adding synchronized in the name of the method that it is not neeeded, or I'm using wrongly the read and write locks.

Note: As I translated the name of the variables and methods in order to help to be understood I may have written some things differently, but in my program is correct

Thanks a lot in advanced


Solution

  • Firstly, it's very hard to read that code. In your MainThread you nested if statement few times and without deep code analysing, its hard to tell what is wrong.

    But few things get my attention.

    In Downloads class you grab references for read and write lock from two different ReentrantReadWriteLock objects!

    private Lock readLock=new ReentrantReadWriteLock().readLock();
    private Lock writeLock=new ReentrantReadWriteLock().writeLock(); 
    

    It can't work! You should create one ReentrantReadWriteLock instance for Download object and obtain references for readLock and writeLock lock from it. Read and Write lock are connected, so when you lock on writeLock you prevent readers from executing protected part of code. Pleas read more about ReadWriteLock and you can look at example of using ReentrantReadWriteLock in mbassador EventBus.

    What's more, is that you don't need locks in Downloads and DownloadsNode either! Because you create instances of those class per thread and you don't share it between threads. So you can simply remove all code responsible for locking from those classes. The only object witch is shared among the thread is instance of Cache class.

    Furthermore you don't need synchronized(this) in MainThread! Each new thread created from this class will be synchronized on different object (this). So it's doesn't make sense to add all of those synchronized blocks. The worst thing is that I see only one thread in your attached code.

    I think you should remove all code responsible for locking and synchronizing, and then analyse witch objects are shared between threads and should be protected from concurrent access. For me only object witch need to be locked is Cache instance.

    Maybe if you post working SSCCE some one can help you with refactoring this code. VGR point few good tips in his comment.