I am writing a message oriented middleware, in which subscribers and publishers communicate via a queue in the middle class MessageBroker. publishers should drop messages into a queue of a topic if it is not full and subscribers should get the message of a subscribed topic if the queue is not empty. The problem now arises when I try to include wait() NotifyAll() in the receiveMessage() method of the subscriber. With the Publisher it works without any problems, but with the Subscriber I have the problem that they are not retrieved from the wait state, so that they do nothing.
Publisher method:
public synchronized void sendMessage() {
BlockingQueue<Message> topicQueue = mb.getQueue(topic);
if (topicQueue != null) {
try {
// Überprüfen, ob Platz in der Queue ist, bevor Nachricht gesendet wird
while (topicQueue.remainingCapacity() == 0) {
wait();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
Message m = topic.generateMessage();
mb.publish(m);
if (!gotActive) {
mb.increasePublisherCounter(1);
gotActive = true;
}
System.out.println(m.getContent() + " wurde zur Queue hinzugefügt");
notifyAll(); // Alle Threads benachrichtigen
}
}
Subscriber method:
public synchronized void receiveMessage() {
for (Topic topic : topics) {
BlockingQueue<Message> queue = mb.getQueue(topic);
synchronized (queue) {
try {
// Warten, bis die Warteschlange nicht mehr leer ist
while (queue.isEmpty()) {
queue.wait(); // Warte auf Benachrichtigung, wenn die Warteschlange leer ist
}
// Nachricht aus der Warteschlange holen
Message message = queue.peek();
if (message != null) {
System.out.println(name + " hat Nachricht erhalten: " + message.getContent());
incrementProcessedCounter(topic);
if (topic.getSubscriberCount() == getProcessedCounter(topic)) {
queue.remove();
resetProcessedCounter(topic);
queue.notifyAll(); // Benachrichtige andere wartende Threads
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
what do I have to change in these methods that the methods will work? Thanks for your help. I can also add more code if you need :)
when I try to include wait() NotifyAll() in the receiveMessage() method of the subscriber. With the Publisher it works without any problems, but with the Subscriber I have the problem that they are not retrieved from the wait state, so that they do nothing.
I have a number of questions about your code but the first comment is that you should not have to do any wait/notify yourself. One of the benefits of the BlockingQueue
is that they are fully synchronized. Calling queue.take()
will block until there is a message to be consumed and queue.put()
will wait until there is space in the queue to put it.
public void sendMessage() {
BlockingQueue<Message> topicQueue = mb.getQueue(topic);
if (topicQueue != null) {
Message m = topic.generateMessage();
topicQueue.put(m);
// ... counters and the like?
}
}
and
public void receiveMessage() {
for (Topic topic : topics) {
BlockingQueue<Message> queue = mb.getQueue(topic);
Message message = queue.take();
// ... work with the message
}
}
As to why your code isn't working it's very hard to tell. Here are some comments:
topicQueue
in your sendMessage()
.peek()
and remove()
can be accomplished with a poll()
which does both of those things, returning null
if no messages in the queue. Again take()
waits.InterruptedException
. You should at least do a Thread.currentThread().interrupt();
in the catch block but you should also return or break out of the loop or something.Hope something here helps.