c++boostshared-memoryinterprocess

C++ shared memory among forked processes (what do i do wrong)


I am making a basic program when the main process creates processes with fork that take the first element from a list and print it. I used boost library to manage shared memory, but this is my first time using such tools and i may have done things wrong.

#include <iostream>
#include <list>
#include <unistd.h>
#include <sys/wait.h>
#include <vector>
#include <boost/interprocess/shared_memory_object.hpp>
#include <boost/interprocess/mapped_region.hpp>
#include <boost/interprocess/sync/interprocess_mutex.hpp>
using namespace std;
using namespace boost::interprocess;


struct shared_data{
    interprocess_mutex mtx;
    list<int> qqueue;

};


void modify_list(shared_data*& sdat);


int main(){
    shared_memory_object shm(open_or_create, "shm", read_write);
    shm.truncate(sizeof(shared_data));

    mapped_region reg(shm, read_write);
    void* address=reg.get_address();
    shared_data* shptr=new (address) shared_data;
    
    vector<pid_t> processors;
    size_t num_processors=4;
    
    for(int j=1; j<8; ++j){
        shptr->qqueue.push_back(j);
    }

    for(size_t i=0; i<num_processors; ++i){
        pid_t pid=fork();
        if(pid==0){
            modify_list(shptr);
        }
        else if(pid>0){
            processors.push_back(pid);
        }
    }

    for (pid_t cpu : processors) {
        int status;
        waitpid(cpu, &status, 0);
    }
}






void modify_list(shared_data*& sdat){
    while(true){
        sdat->mtx.lock();
        if(sdat->qqueue.empty()){
            sdat->mtx.unlock();
            break;
        }
        int ff = sdat->qqueue.front();
        sdat->qqueue.pop_front();
        sdat->mtx.unlock();
        cout<<"Process: "<<getpid()<<" dequeued "<<ff<<"\n";
        cout.flush();
    }
}

I am getting the following output:

Process: 320 dequeued 1
Process: 320 dequeued 2
Process: 320 dequeued 3
Process: 320 dequeued 4
Process: 320 dequeued 5
Process: 320 dequeued 6
Process: 320 dequeued 7
Process: 321 dequeued 2

since 2 is already dequeued by process 320 why is it again dequeued by 321, while 2 only exists once in the list.


Solution

  • well i have solved my problem (more than 3 months ago) thanks to the comments (not the answer it fell overengineered) anyways in case anyone else finds this question useful here is the simplest way i solved it then: I had to use boost::container::list to store my data so that it could handle an allocator like the boost::interprocess:: allocator which forces the list to store its results in a shared memory region. In the main function a managed_shared_memory object is created to make a space in the shared region of 64KiB (too big for what I want I guess unless the list gets bigger) then this region should be assigned to a pointer that points to a shared_data object.

    #include <iostream>
    #include <vector>
    #include <sys/wait.h>
    #include <unistd.h>
    #include <boost/interprocess/shared_memory_object.hpp>
    #include <boost/interprocess/mapped_region.hpp>
    #include <boost/interprocess/sync/interprocess_mutex.hpp>
    #include <boost/interprocess/containers/list.hpp>
    #include <boost/interprocess/managed_shared_memory.hpp>
    #include <boost/interprocess/allocators/allocator.hpp>
    #define SHARED_SIZE 65536
    using namespace std;
    namespace bip=boost::interprocess;
    namespace bc=boost::container;
    
    
    struct shared_data{
        using shared_allocator_t=bip::allocator<int, bip::managed_shared_memory::segment_manager>;
        using shared_list_t=bc::list<int, shared_allocator_t>;
        shared_list_t slist;
        bip::interprocess_mutex mtx;
    
        shared_data(const shared_allocator_t& alloc): slist(alloc) {}
    };
    
    void modify_list(shared_data*& shptr){
        int c=-1;
        shptr->mtx.lock();
        if(!shptr->slist.empty()){
            c=shptr->slist.front();
            shptr->slist.pop_front();
        }
        shptr->mtx.unlock();
        cout<<"Process: "<<getpid()<<" dequeued: "<<c<<"\n";
        exit(0);
    }
    
    int main(){
        bip::shared_memory_object::remove("shm");
        bip::managed_shared_memory shm(bip::open_or_create, "shm", SHARED_SIZE);
        shared_data* shptr = shm.find_or_construct<shared_data>("SharedData")(shm.get_segment_manager());
        vector<pid_t> processes;
        for(int j=0; j<8; ++j){
            shptr->slist.push_back(j);
        }
        for(int j=0; j<6; ++j){
            pid_t pid=fork();
            if(pid==0){
                modify_list(shptr);
            }
            else{
                processes.push_back(pid);
            }
        }
    
        for(pid_t proc : processes){
            waitpid(proc, nullptr, 0);
        }
        cout<<"Father exits\n";
    }
    

    output:

    Process: 6762 dequeued: 0
    Process: 6763 dequeued: 1
    Process: 6764 dequeued: 2
    Process: 6765 dequeued: 3
    Process: 6766 dequeued: 4
    Process: 6767 dequeued: 5
    Father exits
    

    `