I am new to OpenMP so took some time to figure out the right way to ask the question so that its easier for experts to understand my queries.
I am been trying to formulate the best way to ask the problem: Previous attempts are:
But I think I have found the most parsimonious way to ask
Q: How can we implement a parallel set of FIFO task queues ?
So each queue can parallelly execute - but inside the queue - the execution order of tasks must be FIFO aka sequential.
We basically need a master thread that feeds the FIFO queues and a set of thread pools that picks from these queues and executes it as threads become available.
Hopefully this is the best way to ask instead of pseudo code examples
I don't think OpenMP tasks are a good match for this. Three reasons for this:
Therefore I suggest you use a normal FIFO setup. You can still use OpenMP's thread-pool for this. Here is a quick outline of how it may look:
We start with a few placeholders for your sensor data. For functions I follow pthread's convention: return 0 on success, error code otherwise.
struct SensorPacket
{
int sensor_id, packetno;
void* data;
};
extern int sensor_packet_next(struct SensorPacket* out);
extern int sensor_packet_process(struct SensorPacket* out);
extern void sensor_packet_free(struct SensorPacket* in);
For the FIFO I use a textbook blocking ring buffer. Blocking means we need condition variables which (unless I'm mistaken) are not supported by OpenMP, so I use pthreads for this. We could instead go for spinlocks or lockfree designs. However, since there is a good chance either the producer will outperform the consumer or vice-versa, putting one side to sleep might boost the clock speed of the other side. Therefore I think a blocking / sleeping FIFO is good, at least as a baseline for benchmarks of other approaches.
#include <errno.h>
/* using EPIPE */
#include <pthread.h>
#include <stdbool.h>
#define FIFO_SIZE 64
struct PacketFifo
{
pthread_mutex_t mutex;
int head, tail;
_Bool closed;
pthread_cond_t write_avail, read_avail;
struct SensorPacket buf[FIFO_SIZE];
};
int packet_fifo_init(struct PacketFifo* self)
{
int err;
if((err = pthread_mutex_init(&self->mutex, NULL)) != 0)
return err;
self->head = self->tail = 0;
self->closed = false;
if((err = pthread_cond_init(&self->write_avail, NULL)) != 0)
goto err_mut;
if((err = pthread_cond_init(&self->read_avail, NULL)) != 0)
goto err_write;
return err;
err_write:
pthread_cond_destroy(&self->write_avail);
err_mut:
pthread_mutex_destroy(&self->mutex);
return err;
}
inline _Bool packet_fifo_empty(const struct PacketFifo* self)
{ return self->head == self->tail; }
inline int packet_fifo_next_head(const struct PacketFifo* self)
{ return self->head == FIFO_SIZE - 1 ? 0 : self->head + 1; }
inline int packet_fifo_next_tail(const struct PacketFifo* self)
{ return self->tail == FIFO_SIZE - 1 ? 0 : self->tail + 1; }
inline _Bool packet_fifo_full(const struct PacketFifo* self)
{ return self->head == packet_fifo_next_tail(self); }
void packet_fifo_destroy(struct PacketFifo* self)
{
while(! packet_fifo_empty(self)) {
sensor_packet_free(&self->buf[self->head]);
self->head = packet_fifo_next_head(self);
}
pthread_cond_destroy(&self->read_avail);
pthread_cond_destroy(&self->write_avail);
pthread_mutex_destroy(&self->mutex);
}
int packet_fifo_push(
struct PacketFifo* self, const struct SensorPacket* packet)
{
int err;
if((err = pthread_mutex_lock(&self->mutex)) != 0)
return err;
while(packet_fifo_full(self) && ! self->closed)
if((err = pthread_cond_wait(&self->write_avail, &self->mutex)) != 0)
goto err_mut;
if(self->closed) {
err = EPIPE;
goto err_mut;
}
self->buf[self->tail] = *packet;
self->tail = packet_fifo_next_tail(self);
/*
* We unlock first, then signal. This is slightly faster (prevents signaled
* thread from waiting for the mutex) but prevents error recovery, since we
* cannot undo the push of the packet. So we don't report any error.
* There should be none anyway
*/
pthread_mutex_unlock(&self->mutex);
pthread_cond_signal(&self->read_avail);
return 0;
err_mut:
pthread_mutex_unlock(&self->mutex);
return err;
}
int packet_fifo_pop(
struct PacketFifo* self, struct SensorPacket* packet)
{
int err;
if((err = pthread_mutex_lock(&self->mutex)) != 0)
return err;
while(packet_fifo_empty(self) && ! self->closed)
if((err = pthread_cond_wait(&self->read_avail, &self->mutex)) != 0)
goto err_mut;
if(packet_fifo_empty(self)) { /* closed and drained */
err = EPIPE;
goto err_mut;
}
*packet = self->buf[self->head];
self->head = packet_fifo_next_head(self);
pthread_mutex_unlock(&self->mutex);
pthread_cond_signal(&self->write_avail);
return 0;
err_mut:
pthread_mutex_unlock(&self->mutex);
return err;
}
int packet_fifo_close(struct PacketFifo* self)
{
int err;
if((err = pthread_mutex_lock(&self->mutex)) != 0)
return err;
self->closed = true;
pthread_mutex_unlock(&self->mutex);
pthread_cond_broadcast(&self->write_avail);
pthread_cond_broadcast(&self->read_avail);
return 0;
}
Now all we need is one FIFO per consumer thread and we can let the producer distribute the work among them.
#include <stdlib.h>
/* using malloc, free */
static int alloc_fifos(int threads, struct PacketFifo** out)
{
int i, err = 0;
struct PacketFifo* rtrn;
if((rtrn = malloc((unsigned) threads
* sizeof(struct PacketFifo))) == NULL)
return errno;
for(i = 0; i < threads; ++i)
if((err = packet_fifo_init(rtrn + i)) != 0)
goto err_destr;
*out = rtrn;
return err;
err_destr:
while(i--)
packet_fifo_destroy(rtrn + i);
return err;
}
static void free_fifos(struct PacketFifo* array, int threads)
{
while(threads--)
packet_fifo_destroy(array + threads);
free(array);
}
static int run_producer(struct PacketFifo* fifos, int consumer_threads)
{
int err;
struct SensorPacket packet;
while(! (err = sensor_packet_next(&packet))) {
struct PacketFifo* fifo = fifos + packet.sensor_id % consumer_threads;
if((err = packet_fifo_push(fifo, &packet)) != 0) {
sensor_packet_free(&packet);
break;
}
}
while(consumer_threads--)
packet_fifo_close(fifos + consumer_threads);
return err;
}
static int run_consumer(struct PacketFifo* fifo)
{
int err;
struct SensorPacket packet;
while(! (err = packet_fifo_pop(fifo, &packet))) {
err = sensor_packet_process(&packet);
sensor_packet_free(&packet);
if(err)
goto err_close;
}
if(err == EPIPE) /* producer closed */
return 0;
err_close:
packet_fifo_close(fifo); /* notify producer */
return err;
}
Again, we can use OpenMP's thread pool to launch the whole endeavor.
#include <omp.h>
int run_producer_consumer(int sensor_count)
{
int max_threads, consumer_threads, err;
struct PacketFifo* fifos;
max_threads = omp_get_max_threads();
if(max_threads < 2)
return EAGAIN;
consumer_threads = max_threads <= sensor_count ?
max_threads - 1 : sensor_count;
if((err = alloc_fifos(consumer_threads, &fifos)) != 0)
return err;
# pragma omp parallel num_threads(consumer_threads + 1)
{
int threadnum = omp_get_thread_num();
if(threadnum)
run_consumer(fifos + threadnum - 1);
else
err = run_producer(fifos, consumer_threads);
}
free_fifos(fifos, consumer_threads);
return err;
}
Can you please explain what is the semantics for
packet_fifo_close
?
We need a way to stop the process and also deal with errors, unless we don't care about either. In particular, the producer may run out of elements to produce. In either case, the side of the FIFO that does not continue, closes said FIFO.
At that point, the rules are pretty straightforward: The producer can no longer push elements into the FIFO since there is no guarantee a consumer will get to them. On the other hand, a consumer will still read the elements remaining in the FIFO before getting the return value indicating the closed FIFO. That means the remaining elements in the FIFO are drained once the producer stops.
The closed
condition is signalled with the error return value EPIPE
, just to mimic similar semantics in a Unix pipe.
I have modified the
#pragma omp parallel
to launch consumers only when we haveFIFO_SIZE * max_threads -1
packets already pushed to fifo
Not a good idea. You want to overlap production and consumption as much as possible because this maximizes parallelization. It also prevents potential deadlocks when the FIFOs get full before your launch criterium is reached
Would you be able to modify the code to showcase reading from a binary file where
sensor_id
is 2 bytes andsensor_value
is next 2 bytes andsensor_sequence
is next 2 bytes? So 6 byte packet of concatenated binary data. andsensor_packet_process
can just print the sequence number and value
Sure. You didn't specify whether these values are little-endian or big-endian so I assume the native machine order.
struct SensorPacket
{
short sensor_id, sensor_value, sensor_sequence;
};
#include <stdio.h>
int sensor_packet_next(FILE* infile, struct SensorPacket* out)
{
if(fread(out, sizeof(*out), 1, infile))
return 0; /* success */
if(ferror(infile))
return EIO;
return ENOENT; /* end of file */
}
int sensor_packet_process(struct SensorPacket* in)
{
if(printf("%d %d %d\n", in->sensor_id, in->sensor_sequence,
in->sensor_value) < 0)
return EIO;
return 0;
}
void sensor_packet_free(struct SensorPacket*)
{ /* nothing to do */ }
static int run_producer(
FILE* infile, struct PacketFifo* fifos, int consumer_threads)
{
...
while(! (err = sensor_packet_next(infile, &packet))) {
...
}
int run_producer_consumer(FILE* infile, int sensor_count)
{
...
err = run_producer(infile, fifos, consumer_threads);
...
}
int main(int argc, char** argv)
{
FILE* infile;
int err, sensor_count = 5000;
if(argc != 2) {
fprintf(stderr, "Usage: %s INPUT_FILE\n",
argc ? argv[0] : "executable");
return 1;
}
if(! (infile = fopen(argv[1], "rb")) {
fprintf(stderr, "Cannot open file: '%s'\n", argv[1]);
return 2;
}
err = run_producer_consumer(infile, sensor_count);
fclose(infile);
switch(err) {
case 0: break; /* shouldn't happen */
case ENOENT: break; /* no error, file end reached */
default:
errno = err;
perror(NULL);
return 3;
}
return 0;
}
Side note: A-priori knowledge of the number of sensors isn't needed. We might as well launch like this:
int run_producer_consumer(FILE* infile)
{
int consumer_threads, err;
struct PacketFifo* fifos;
consumer_threads = omp_get_max_threads() - 1;
if(consumer_threads < 1) /* not enough threads available */
return EAGAIN;
if((err = alloc_fifos(consumer_threads, &fifos)) != 0)
return err;
# pragma omp parallel
{
int threadnum = omp_get_thread_num();
if(threadnum)
run_consumer(fifos + threadnum - 1);
else
err = run_producer(infile, fifos, consumer_threads);
}
free_fifos(fifos, consumer_threads);
return err;
}
My number of sensors is >> than number of threads. My threads are 32 and sensors are > 5000 So I think I am facing a deadlock as the producer has not produced data for a
sensor_id%threads
and is waiting to be signaled when another consumer thread enters and deadlocks waiting for mutex.
That shouldn't happen assuming you launch producers and consumers at the same time. Here is a simple producer without file IO that I built and that works fine on my 16 thread system.
int sensor_packet_next(struct SensorPacket* out)
{
static int next_sensor, next_sensor_sequence;
out->sensor_id = (short) next_sensor;
next_sensor = next_sensor;
out->sensor_value = (short) rand();
out->sensor_sequence = (short) next_sensor_sequence;
if(++next_sensor == 5000) {
next_sensor = 0;
next_sensor_sequence++;
}
return 0;
}
The worst that can happen is that you have more than 64 times the same sensor ID in a row (or one sensor ID modulo consumer count). Then the producer would fill one FIFO to the brim and has to wait until the consumer drains the FIFO while all other consumers are idle.
In that case increase the FIFO size until it is larger than the maximum number expected in a row. Or switch to a dynamically growing FIFO. But then you have to be careful again to not overload memory.