javamultithreadingsynchronizationwaitnotify

Wait() and NotifyAll() does not work in programm


I am writing a message oriented middleware, in which subscribers and publishers communicate via a queue in the middle class MessageBroker. publishers should drop messages into a queue of a topic if it is not full and subscribers should get the message of a subscribed topic if the queue is not empty. The problem now arises when I try to include wait() NotifyAll() in the receiveMessage() method of the subscriber. With the Publisher it works without any problems, but with the Subscriber I have the problem that they are not retrieved from the wait state, so that they do nothing.

Publisher method:

public synchronized void sendMessage() {
        BlockingQueue<Message> topicQueue = mb.getQueue(topic);
        if (topicQueue != null) {
            try {
                // Überprüfen, ob Platz in der Queue ist, bevor Nachricht gesendet wird
                while (topicQueue.remainingCapacity() == 0) {
                    wait();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            Message m = topic.generateMessage();
            mb.publish(m);
            if (!gotActive) {
                mb.increasePublisherCounter(1);
                gotActive = true;
            }
            System.out.println(m.getContent() + " wurde zur Queue hinzugefügt");
            notifyAll(); // Alle Threads benachrichtigen
        }
    }

Subscriber method:

public synchronized void receiveMessage() {
        for (Topic topic : topics) {
            BlockingQueue<Message> queue = mb.getQueue(topic);
            synchronized (queue) {
                try {
                    // Warten, bis die Warteschlange nicht mehr leer ist
                    while (queue.isEmpty()) {
                        queue.wait(); // Warte auf Benachrichtigung, wenn die Warteschlange leer ist
                    }

                    // Nachricht aus der Warteschlange holen
                    Message message = queue.peek();
                    if (message != null) {
                        System.out.println(name + " hat Nachricht erhalten: " + message.getContent());
                        incrementProcessedCounter(topic);
                        if (topic.getSubscriberCount() == getProcessedCounter(topic)) {
                            queue.remove();
                            resetProcessedCounter(topic);
                            queue.notifyAll(); // Benachrichtige andere wartende Threads
                        }
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

what do I have to change in these methods that the methods will work? Thanks for your help. I can also add more code if you need :)


Solution

  • when I try to include wait() NotifyAll() in the receiveMessage() method of the subscriber. With the Publisher it works without any problems, but with the Subscriber I have the problem that they are not retrieved from the wait state, so that they do nothing.

    I have a number of questions about your code but the first comment is that you should not have to do any wait/notify yourself. One of the benefits of the BlockingQueue is that they are fully synchronized. Calling queue.take() will block until there is a message to be consumed and queue.put() will wait until there is space in the queue to put it.

    public void sendMessage() {
        BlockingQueue<Message> topicQueue = mb.getQueue(topic);
        if (topicQueue != null) {
            Message m = topic.generateMessage();
            topicQueue.put(m);
            // ... counters and the like?
        }
    }
    

    and

    public void receiveMessage() {
        for (Topic topic : topics) {
            BlockingQueue<Message> queue = mb.getQueue(topic);
            Message message = queue.take();
            // ... work with the message
         }
    }
    

    As to why your code isn't working it's very hard to tell. Here are some comments:

    Hope something here helps.