Let's say I have some consumer threads cycling reading elements infinitely from a blocking queue, and some producer threads that will produce a certain amount of elements on the same queue and then return. The consumers don't know how many elements will be inserted, that's why they keep cycling. After all the elements are produced and consumed, the consumer threads will get stuck waiting for more.
My goal is to make the consumers terminate in this scenario writing a "good" code to do so.
One possible solution in Java (that I don't think is easy/worth it to implement in C, correct me if I'm mistaken) that looks good and "well abstracted" is to use Thread.interrupt() and exceptions in the following way, with 1 consumer and 1 producer in this example:
(haven't used java in a long time, sorry if there are mistakes, especially on generics)
public class Consume implements Runnable {
private BlockingQueue<T> queue;
public Consume(BlockingQueue<T> queue){
this.queue = queue;
}
public void run() {
try {
while(true) {
T elem;
elem = queue.take(); //Blocking
//Do something with the element
}
} catch(InterruptedException e) {
//Do nothing and quit
}
}
}
public class Produce implements Runnable {
private BlockingQueue<T> queue;
public Produce(BlockingQueue<T> queue) {
this.queue = queue;
}
public void run() {
//Insert a random number of element in the queue, and then quit
}
}
public static void main(String[] args) {
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(capacity);
Thread consumer = new Thread(new Consume(queue));
Thread producer = new Thread(new Produce(queue));
consumer.start();
producer.start();
producer.join();
consumer.interrupt();
return 0;
}
This looks good enough to me because to terminate them I only need to use the same function (Thread.interrupt) for every type of data structure and without the need to access the internal implementation of the data structures.
In C, using pthreads, I could implement a solution with pthread_cond_signal
/ *_broadcast
and the setting of some flag, but I would need to access the internal implementation of the data structure.
Is there a better way in general to face this problem? If not, can I implement something similar to the Java solution in C?
I found out that on MacOS a system signal received by a thread waiting with pthread_cond_wait
will cause the function to return. I started writing a solution similar to the Java one based on this mechanism only to find out that pthread_cond_wait
doesn't return on Linux, and I need my code to work on both OSes.
What's the best way to terminate consumer/producer threads waiting on blocking data structures that are empty/full in C?
The best way is to use (possibly write) a data structure and associated functions that provide explicitly for external interruption. The usual idiom for this sort of thing relies on the Swiss army knife of thread synchronization: the condition variable. That applies in both C and Java, although Java makes that more accessible by endowing every Object with built-in mutex and CV functionality.* Such a data structure uses no blocking functions other than mutex and CV waits, and interruption revolves around broadcasting to that CV.
To understand how that would work you must first understand how properly to use CVs to implement blocking data structures in general. Note in particular that the correct idiom for retrieving data from such a data structure follows these steps:
Retrieve datum
Lock the data structure
Check whether a datum is available for retrieval
If not:
3.1. wait upon a CV designated for the purpose
3.2. upon return from the wait, loop back to (2)
Remove the datum
Unlock the data structure
The corresponding procedure for depositing data in that structure follows these steps:
Deposit datum
Notes:
Now consider step (2) in the retrieval procedure. It involves performing a test before every possible wait, and control returns there after every wait for a determination of whether a new wait should be performed. The simple scheme above shows making that determination based only on whether any data are available, but you can insert any additional conditions you like there. The combined condition tested here is the namesake for which "condition variables" are named.
In particular, you can add a test for whether a termination flag has been set. With that modification in place, the termination procedure looks a lot like the data deposition procedure. For example:
Terminate
In this case, however, it is essential to broadcast to the CV rather than signaling it if there is any possibility that more than one thread is waiting.
* Java also added higher-level abstractions for locks and CVs, and a bunch of standard data structures providing for safe concurrent use. To my knowledge, however, none of the standard library classes provide directly for the kind of explicit termination request required here.