I'm trying to develop the classic producer-consumer circular buffer example with one producer and one consumer respectively.
However I'm having some problems that I can't figure out where they come from. In particular the producer seems to work but the consumer always picks up a 0. I was thinking about some problem with shared memory and/or buffering but they seem to be implemented correctly
Code explanation:
-Initially I create the shared memory area of the size of the STRUCT_BUFFER defined above. It is actually only used to provide the size at this point.
-I create 3 semaphores: mutex, full, empty.
-fork(): in the child (consumer) I call a get of the data on the buffer and if the number found is -1 (termination token) it puts it back and exits the loop, terminating. In the parent (producer) I execute a for which puts each index on the buffer respectively and at the end puts a -1 (termination token).
-Then wait until the consumers are finished and eliminate the traffic lights. The get and put functions are the classic codes of producer-consumers.
ps. SYSC() macro is only used for error management.
#include <sys/wait.h>
#include <sys/mman.h>
#include <stdlib.h>
#include <stdio.h>
#include <fcntl.h>
#include <errno.h>
#include <unistd.h>
#include <time.h>
#include <semaphore.h>
#define SHM_NAME "/shm8"
#define MUTEX "/mutex"
#define FULL "/full"
#define EMPTY "/empty"
#define N 10
#define SYSC(value, command, message) if ((value=command) == -1){perror(message); exit(errno);}
#define SYSCN(value, command, message) if ((value=command) == NULL){perror(message); exit(errno);}
typedef struct{
int B[N];
int *head;
int *tail;
} STRUCT_BUFFER;
void put(int, int*, int*, int*, sem_t*, sem_t*, sem_t*);
int get(int*, int*, int*, sem_t*, sem_t*, sem_t*);
int main(){
int ret_value, mem_fd;
void * ptr;
pid_t pid;
sem_t * mutex;
sem_t * full;
sem_t * empty;
int * head;
int * tail;
// Creating shared memory area
SYSC(mem_fd, shm_open(SHM_NAME, O_CREAT | O_RDWR, 0666), "in shm_open");
SYSC(ret_value, ftruncate(mem_fd, sizeof(STRUCT_BUFFER)), "in ftruncate");
// Memory mapping
SYSCN(ptr, mmap(NULL, sizeof(STRUCT_BUFFER), PROT_READ | PROT_WRITE, MAP_SHARED, mem_fd, 0), "in mmap");
head = tail = ptr;
// Closing file descriptor
SYSC(ret_value, close(mem_fd), "in close");
// Creating semaphores
mutex = sem_open(MUTEX, O_CREAT | O_RDWR, S_IRUSR | S_IWUSR, 1);
if(mutex == SEM_FAILED) {
perror("In sem_open");
exit(EXIT_FAILURE);
}
full = sem_open(FULL, O_CREAT | O_RDWR, S_IRUSR | S_IWUSR, N);
if(full == SEM_FAILED) {
perror("In sem_open");
exit(EXIT_FAILURE);
}
empty = sem_open(EMPTY, O_CREAT | O_RDWR, S_IRUSR | S_IWUSR, 0);
if(empty == SEM_FAILED) {
perror("In sem_open");
exit(EXIT_FAILURE);
}
// Creating a new process
SYSC(pid, fork(), "in fork");
if(!pid) {
// Consumer code
printf("Consumer %d starts\n", getpid());
int toGet;
while(1) {
toGet = get(ptr, head, tail, mutex, full, empty);
printf("Consumer %d consumed %d\n", getpid(), toGet);
// If termination token
if(toGet == -1) {
put(toGet, ptr, head, tail, mutex, full, empty);
break;
}
}
sleep(1);
printf("Consumer %d finishes\n",getpid());
exit(EXIT_SUCCESS);
} else {
// Producer code
printf("Producer %d starts\n",getpid());
int toPut;
int exitToken = -1;
// Production
for(int i = 0 ; i < N ; i++) {
toPut = i + 1;
put(toPut, ptr, head, tail, mutex, full, empty);
printf("Producer %d produced %d. Cycle:%d\n", getpid(), toPut, i);
sleep(1);
}
// Termination token
put(exitToken, ptr, head, tail, mutex, full, empty);
printf("Producer %d produced termination token\n", getpid());
sleep(1);
// Wait
SYSC(ret_value, wait(NULL), "in wait");
// Closing semaphores
sem_close(mutex);
sem_close(empty);
sem_close(full);
sem_unlink(MUTEX);
sem_unlink(FULL);
sem_unlink(EMPTY);
// Closing shared memory
munmap(ptr, sizeof(STRUCT_BUFFER));
printf("Producer %d finishes\n",getpid());
exit(EXIT_SUCCESS);
}
}
void put(int toPut, int * ptr, int * head, int * tail, sem_t * mutex, sem_t * full, sem_t * empty) {
sem_wait(full);
sem_wait(mutex);
// Critical section
ptr[*tail] = toPut;
*tail = ((*tail) + 1) % N;
sem_post(mutex);
sem_post(empty);
}
int get(int * ptr, int * head, int * tail, sem_t * mutex, sem_t * full, sem_t * empty) {
sem_wait(empty);
sem_wait(mutex);
// Critical section
int toGet;
toGet = ptr[*head];
*head = ((*head) + 1) % N;
sem_post(mutex);
sem_post(full);
return toGet;
}
Having inferred suitable definitions for your SYSC
and SYSCN
macros, I was able to build your program and confirm your observed misbehavior.
The primary error is to do with the head
and tail
pointers. When you declare them as pointers in main
and initialize them like so:
head = tail = ptr;
, you are treating them as if they are pointers to the head and tail elements of the buffer themselves. It would be ok in that case for the head and tail pointers to initially alias each other and the base pointer.
In your get()
and put()
functions, however, you are treating head
and tail
as pointers to variables storing the indices of the head and tail elements. For example:
ptr[*tail] = toPut; *tail = ((*tail) + 1) % N;
For that use, it is not ok for those pointers to alias each other or the data. They need then to point to their own, separate objects.
As a secondary matter, the head
and tail
indices (int
s, not pointers) need to be shared between the processes, because the consumer process accesses both. In the original code, the head
and tail
pointers used are both local variables of main
, and so not shared between processes (each process has its own, independent copies of these). It looks like you might have planned to share the pointers via the shared STRUCT_BUFFER
object, but ended up not doing so. This would be an appropriate place to put the head and tail indices that you need to share.
Additionally, I would recommend
using an anonymous memory mapping instead of creating and mapping a POSIX shared memory segment. Aside from being simpler, that will eliminate issues that could arise from the shared memory segment living longer than you intended / needed or being shared by distinct instances of your program running concurrently.
using unnamed (process-shared) semaphores stored in the shared memory region instead of named semaphores. This is slightly simpler, and eliminates issues that could arise from your semaphores living longer than you intended or being shared by distinct instances of your program running concurrently. These object would need to be stored in shared memory, but you already have a setup for that. Just add these to STRUCT_BUFFER
.
having implemented all of the above, your STRUCT_BUFFER
object will contain all the information the get()
and put()
functions need to work with, other than the particular value that put()
should store. I strongly recommend modifying these functions to accept a pointer to the STRUCT_BUFFER
instead of all those individual arguments.