copenmpmulticore

OpenMP multiple FIFO task queues


I am new to OpenMP so took some time to figure out the right way to ask the question so that its easier for experts to understand my queries.

I am been trying to formulate the best way to ask the problem: Previous attempts are:

Attempt1 Attempt2

But I think I have found the most parsimonious way to ask

Q: How can we implement a parallel set of FIFO task queues ?

So each queue can parallelly execute - but inside the queue - the execution order of tasks must be FIFO aka sequential.

We basically need a master thread that feeds the FIFO queues and a set of thread pools that picks from these queues and executes it as threads become available.

Hopefully this is the best way to ask instead of pseudo code examples


Solution

  • I don't think OpenMP tasks are a good match for this. Three reasons for this:

    1. We've been here for three questions on this. It's clearly not straightforward. Even if we find a way to make it work, the code will likely be very obtuse, hard to modify, and brittle
    2. With a producer-consumer setup, there is a chance that your producer outperforms your consumers. At that point you want to have a simple method to rate-limit your producer, e.g. through size-limiting your FIFOs. Otherwise you might run out of memory. It also increases the chance that a consumer can grab the next item from a shared cache level rather than main memory. Launching tasks in a fire-and-forget fashion isn't really helpful to achieve this
    3. Tasks are rather heavy-weight constructs. There is a very good chance that their implementation will have a higher runtime overhead than simpler solutions

    Therefore I suggest you use a normal FIFO setup. You can still use OpenMP's thread-pool for this. Here is a quick outline of how it may look:

    We start with a few placeholders for your sensor data. For functions I follow pthread's convention: return 0 on success, error code otherwise.

    struct SensorPacket
    {
        int sensor_id, packetno;
        void* data;
    };
    extern int sensor_packet_next(struct SensorPacket* out);
    extern int sensor_packet_process(struct SensorPacket* out);
    extern void sensor_packet_free(struct SensorPacket* in);
    

    For the FIFO I use a textbook blocking ring buffer. Blocking means we need condition variables which (unless I'm mistaken) are not supported by OpenMP, so I use pthreads for this. We could instead go for spinlocks or lockfree designs. However, since there is a good chance either the producer will outperform the consumer or vice-versa, putting one side to sleep might boost the clock speed of the other side. Therefore I think a blocking / sleeping FIFO is good, at least as a baseline for benchmarks of other approaches.

    #include <errno.h>
    /* using EPIPE */
    #include <pthread.h>
    #include <stdbool.h>
    
    #define FIFO_SIZE 64
    
    struct PacketFifo
    {
        pthread_mutex_t mutex;
        int head, tail;
        _Bool closed;
        pthread_cond_t write_avail, read_avail;
        struct SensorPacket buf[FIFO_SIZE];
    };
    
    int packet_fifo_init(struct PacketFifo* self)
    {
        int err;
        if((err = pthread_mutex_init(&self->mutex, NULL)) != 0)
            return err;
        self->head = self->tail = 0;
        self->closed = false;
        if((err = pthread_cond_init(&self->write_avail, NULL)) != 0)
            goto err_mut;
        if((err = pthread_cond_init(&self->read_avail, NULL)) != 0)
            goto err_write;
        return err;
      err_write:
        pthread_cond_destroy(&self->write_avail);
      err_mut:
        pthread_mutex_destroy(&self->mutex);
        return err;
    }
    inline _Bool packet_fifo_empty(const struct PacketFifo* self)
    { return self->head == self->tail; }
    
    inline int packet_fifo_next_head(const struct PacketFifo* self)
    { return self->head == FIFO_SIZE - 1 ? 0 : self->head + 1; }
    
    inline int packet_fifo_next_tail(const struct PacketFifo* self)
    { return self->tail == FIFO_SIZE - 1 ? 0 : self->tail + 1; }
    
    inline _Bool packet_fifo_full(const struct PacketFifo* self)
    { return self->head == packet_fifo_next_tail(self); }
    
    void packet_fifo_destroy(struct PacketFifo* self)
    {
        while(! packet_fifo_empty(self)) {
            sensor_packet_free(&self->buf[self->head]);
            self->head = packet_fifo_next_head(self);
        }
        pthread_cond_destroy(&self->read_avail);
        pthread_cond_destroy(&self->write_avail);
        pthread_mutex_destroy(&self->mutex);
    }
    
    int packet_fifo_push(
      struct PacketFifo* self, const struct SensorPacket* packet)
    {
        int err;
        if((err = pthread_mutex_lock(&self->mutex)) != 0)
            return err;
        while(packet_fifo_full(self) && ! self->closed)
            if((err = pthread_cond_wait(&self->write_avail, &self->mutex)) != 0)
                goto err_mut;
        if(self->closed) {
            err = EPIPE;
            goto err_mut;
        }
        self->buf[self->tail] = *packet;
        self->tail = packet_fifo_next_tail(self);
        /*
         * We unlock first, then signal. This is slightly faster (prevents signaled
         * thread from waiting for the mutex) but prevents error recovery, since we
         * cannot undo the push of the packet. So we don't report any error.
         * There should be none anyway
         */
        pthread_mutex_unlock(&self->mutex);
        pthread_cond_signal(&self->read_avail);
        return 0;
      err_mut:
        pthread_mutex_unlock(&self->mutex);
        return err;
    }
    int packet_fifo_pop(
      struct PacketFifo* self, struct SensorPacket* packet)
    {
        int err;
        if((err = pthread_mutex_lock(&self->mutex)) != 0)
            return err;
        while(packet_fifo_empty(self) && ! self->closed)
            if((err = pthread_cond_wait(&self->read_avail, &self->mutex)) != 0)
                goto err_mut;
        if(packet_fifo_empty(self)) { /* closed and drained */
            err = EPIPE;
            goto err_mut;
        }
        *packet = self->buf[self->head];
        self->head = packet_fifo_next_head(self);
        pthread_mutex_unlock(&self->mutex);
        pthread_cond_signal(&self->write_avail);
        return 0;
      err_mut:
        pthread_mutex_unlock(&self->mutex);
        return err;
    }
    int packet_fifo_close(struct PacketFifo* self)
    {
        int err;
        if((err = pthread_mutex_lock(&self->mutex)) != 0)
            return err;
        self->closed = true;
        pthread_mutex_unlock(&self->mutex);
        pthread_cond_broadcast(&self->write_avail);
        pthread_cond_broadcast(&self->read_avail);
        return 0;
    }
    

    Now all we need is one FIFO per consumer thread and we can let the producer distribute the work among them.

    #include <stdlib.h>
    /* using malloc, free */
    
    static int alloc_fifos(int threads, struct PacketFifo** out)
    {
        int i, err = 0;
        struct PacketFifo* rtrn;
        if((rtrn = malloc((unsigned) threads
              * sizeof(struct PacketFifo))) == NULL)
            return errno;
        for(i = 0; i < threads; ++i)
            if((err = packet_fifo_init(rtrn + i)) != 0)
                goto err_destr;
        *out = rtrn;
        return err;
      err_destr:
        while(i--)
            packet_fifo_destroy(rtrn + i);
        return err;
    }
    static void free_fifos(struct PacketFifo* array, int threads)
    {
        while(threads--)
            packet_fifo_destroy(array + threads);
        free(array);
    }
    static int run_producer(struct PacketFifo* fifos, int consumer_threads)
    {
        int err;
        struct SensorPacket packet;
        while(! (err = sensor_packet_next(&packet))) {
            struct PacketFifo* fifo = fifos + packet.sensor_id % consumer_threads;
            if((err = packet_fifo_push(fifo, &packet)) != 0) {
                sensor_packet_free(&packet);
                break;
            }
        }
        while(consumer_threads--)
            packet_fifo_close(fifos + consumer_threads);
        return err;
    }
    static int run_consumer(struct PacketFifo* fifo)
    {
        int err;
        struct SensorPacket packet;
        while(! (err = packet_fifo_pop(fifo, &packet))) {
            err = sensor_packet_process(&packet);
            sensor_packet_free(&packet);
            if(err)
                goto err_close;
        }
        if(err == EPIPE) /* producer closed */
            return 0;
      err_close:
        packet_fifo_close(fifo); /* notify producer */
        return err;
    }
    

    Again, we can use OpenMP's thread pool to launch the whole endeavor.

    #include <omp.h>
    
    int run_producer_consumer(int sensor_count)
    {
        int max_threads, consumer_threads, err;
        struct PacketFifo* fifos;
        max_threads = omp_get_max_threads();
        if(max_threads < 2)
            return EAGAIN;
        consumer_threads = max_threads <= sensor_count ?
        max_threads - 1 : sensor_count;
        if((err = alloc_fifos(consumer_threads, &fifos)) != 0)
            return err;
    #   pragma omp parallel num_threads(consumer_threads + 1)
        {
            int threadnum = omp_get_thread_num();
            if(threadnum)
                run_consumer(fifos + threadnum - 1);
            else
                err = run_producer(fifos, consumer_threads);
        }
        free_fifos(fifos, consumer_threads);
        return err;
    }
    

    Follow-up questions

    Can you please explain what is the semantics for packet_fifo_close?

    We need a way to stop the process and also deal with errors, unless we don't care about either. In particular, the producer may run out of elements to produce. In either case, the side of the FIFO that does not continue, closes said FIFO.

    At that point, the rules are pretty straightforward: The producer can no longer push elements into the FIFO since there is no guarantee a consumer will get to them. On the other hand, a consumer will still read the elements remaining in the FIFO before getting the return value indicating the closed FIFO. That means the remaining elements in the FIFO are drained once the producer stops.

    The closed condition is signalled with the error return value EPIPE, just to mimic similar semantics in a Unix pipe.

    I have modified the #pragma omp parallel to launch consumers only when we have FIFO_SIZE * max_threads -1 packets already pushed to fifo

    Not a good idea. You want to overlap production and consumption as much as possible because this maximizes parallelization. It also prevents potential deadlocks when the FIFOs get full before your launch criterium is reached

    Would you be able to modify the code to showcase reading from a binary file where sensor_id is 2 bytes and sensor_value is next 2 bytes and sensor_sequence is next 2 bytes? So 6 byte packet of concatenated binary data. and sensor_packet_process can just print the sequence number and value

    Sure. You didn't specify whether these values are little-endian or big-endian so I assume the native machine order.

    struct SensorPacket
    {
        short sensor_id, sensor_value, sensor_sequence;
    };
    #include <stdio.h>
    
    int sensor_packet_next(FILE* infile, struct SensorPacket* out)
    {
        if(fread(out, sizeof(*out), 1, infile))
            return 0; /* success */
        if(ferror(infile))
            return EIO;
        return ENOENT; /* end of file */
    }
    int sensor_packet_process(struct SensorPacket* in)
    {
        if(printf("%d %d %d\n", in->sensor_id, in->sensor_sequence,
              in->sensor_value) < 0)
            return EIO;
        return 0;
    }
    void sensor_packet_free(struct SensorPacket*)
    { /* nothing to do */ }
    
    static int run_producer(
          FILE* infile, struct PacketFifo* fifos, int consumer_threads)
    {
        ...
        while(! (err = sensor_packet_next(infile, &packet))) {
        ...
    }
    int run_producer_consumer(FILE* infile, int sensor_count)
    {
        ...
        err = run_producer(infile, fifos, consumer_threads);
        ...
    }
    int main(int argc, char** argv)
    {
        FILE* infile;
        int err, sensor_count = 5000;
        if(argc != 2) {
            fprintf(stderr, "Usage: %s INPUT_FILE\n",
                  argc ? argv[0] : "executable");
            return 1;
        }
        if(! (infile = fopen(argv[1], "rb")) {
            fprintf(stderr, "Cannot open file: '%s'\n", argv[1]);
            return 2;
        }
        err = run_producer_consumer(infile, sensor_count);
        fclose(infile);
        switch(err) {
        case 0: break; /* shouldn't happen */
        case ENOENT: break; /* no error, file end reached */
        default:
            errno = err;
            perror(NULL);
            return 3;
        }
        return 0;
    }
    

    Side note: A-priori knowledge of the number of sensors isn't needed. We might as well launch like this:

    int run_producer_consumer(FILE* infile)
    {
        int consumer_threads, err;
        struct PacketFifo* fifos;
        consumer_threads = omp_get_max_threads() - 1;
        if(consumer_threads < 1) /* not enough threads available */
            return EAGAIN;
        if((err = alloc_fifos(consumer_threads, &fifos)) != 0)
            return err;
    #   pragma omp parallel
        {
            int threadnum = omp_get_thread_num();
            if(threadnum)
                run_consumer(fifos + threadnum - 1);
            else
                err = run_producer(infile, fifos, consumer_threads);
        }
        free_fifos(fifos, consumer_threads);
        return err;
    }
    

    My number of sensors is >> than number of threads. My threads are 32 and sensors are > 5000 So I think I am facing a deadlock as the producer has not produced data for a sensor_id%threads and is waiting to be signaled when another consumer thread enters and deadlocks waiting for mutex.

    That shouldn't happen assuming you launch producers and consumers at the same time. Here is a simple producer without file IO that I built and that works fine on my 16 thread system.

    int sensor_packet_next(struct SensorPacket* out)
    {
        static int next_sensor, next_sensor_sequence;
        out->sensor_id = (short) next_sensor;
        next_sensor = next_sensor;
        out->sensor_value = (short) rand();
        out->sensor_sequence = (short) next_sensor_sequence;
        if(++next_sensor == 5000) {
            next_sensor = 0;
            next_sensor_sequence++;
        }
        return 0;
    }
    

    The worst that can happen is that you have more than 64 times the same sensor ID in a row (or one sensor ID modulo consumer count). Then the producer would fill one FIFO to the brim and has to wait until the consumer drains the FIFO while all other consumers are idle.

    In that case increase the FIFO size until it is larger than the maximum number expected in a row. Or switch to a dynamically growing FIFO. But then you have to be careful again to not overload memory.