c++boostshared-memoryinterprocess

C++ shared memory among forked processes (what do i do wrong)


I am making a basic program when the main process creates processes with fork that take the first element from a list and print it. I used boost library to manage shared memory, but this is my first time using such tools and i may have done things wrong.

#include <iostream>
#include <list>
#include <unistd.h>
#include <sys/wait.h>
#include <vector>
#include <boost/interprocess/shared_memory_object.hpp>
#include <boost/interprocess/mapped_region.hpp>
#include <boost/interprocess/sync/interprocess_mutex.hpp>
using namespace std;
using namespace boost::interprocess;


struct shared_data{
    interprocess_mutex mtx;
    list<int> qqueue;

};


void modify_list(shared_data*& sdat);


int main(){
    shared_memory_object shm(open_or_create, "shm", read_write);
    shm.truncate(sizeof(shared_data));

    mapped_region reg(shm, read_write);
    void* address=reg.get_address();
    shared_data* shptr=new (address) shared_data;
    
    vector<pid_t> processors;
    size_t num_processors=4;
    
    for(int j=1; j<8; ++j){
        shptr->qqueue.push_back(j);
    }

    for(size_t i=0; i<num_processors; ++i){
        pid_t pid=fork();
        if(pid==0){
            modify_list(shptr);
        }
        else if(pid>0){
            processors.push_back(pid);
        }
    }

    for (pid_t cpu : processors) {
        int status;
        waitpid(cpu, &status, 0);
    }
}






void modify_list(shared_data*& sdat){
    while(true){
        sdat->mtx.lock();
        if(sdat->qqueue.empty()){
            sdat->mtx.unlock();
            break;
        }
        int ff = sdat->qqueue.front();
        sdat->qqueue.pop_front();
        sdat->mtx.unlock();
        cout<<"Process: "<<getpid()<<" dequeued "<<ff<<"\n";
        cout.flush();
    }
}

I am getting the following output:

Process: 320 dequeued 1
Process: 320 dequeued 2
Process: 320 dequeued 3
Process: 320 dequeued 4
Process: 320 dequeued 5
Process: 320 dequeued 6
Process: 320 dequeued 7
Process: 321 dequeued 2

since 2 is already dequeued by process 320 why is it again dequeued by 321, while 2 only exists once in the list.


Solution

  • well i have solved my problem (more then 20 days ago) thanks to the comments (not the answer it fell overengineered) anyways in case anyone else finds this question useful here is the simplest way i solved it then: I had to use boost::container::list to store my data so that it could handle an allocator like the boost::interprocess:: allocator which forces the list to store its results in a shared memory region. In the main function a managed_shared_memory object is created to make a space in the shared region of 64KiB (too big for what I want I guess unless the list gets bigger) then this region should be assigned to a pointer that points to a shared_data object.

    #include <iostream>
    #include <vector>
    #include <sys/wait.h>
    #include <unistd.h>
    #include <boost/interprocess/shared_memory_object.hpp>
    #include <boost/interprocess/mapped_region.hpp>
    #include <boost/interprocess/sync/interprocess_mutex.hpp>
    #include <boost/interprocess/containers/list.hpp>
    #include <boost/interprocess/managed_shared_memory.hpp>
    #include <boost/interprocess/allocators/allocator.hpp>
    #define SHARED_SIZE 65536
    using namespace std;
    namespace bip=boost::interprocess;
    namespace bc=boost::container;
    
    
    struct shared_data{
        using shared_allocator_t=bip::allocator<int, bip::managed_shared_memory::segment_manager>;
        using shared_list_t=bc::list<int, shared_allocator_t>;
        shared_list_t slist;
        bip::interprocess_mutex mtx;
    
        shared_data(const shared_allocator_t& alloc): slist(alloc) {}
    };
    
    void modify_list(shared_data*& shptr){
        int c=-1;
        shptr->mtx.lock();
        if(!shptr->slist.empty()){
            c=shptr->slist.front();
            shptr->slist.pop_front();
        }
        shptr->mtx.unlock();
        cout<<"Process: "<<getpid()<<" dequeued: "<<c<<"\n";
        exit(0);
    }
    
    int main(){
        bip::shared_memory_object::remove("shm");
        bip::managed_shared_memory shm(bip::open_or_create, "shm", SHARED_SIZE);
        shared_data* shptr = shm.find_or_construct<shared_data>("SharedData")(shm.get_segment_manager());
        vector<pid_t> processes;
        for(int j=0; j<8; ++j){
            shptr->slist.push_back(j);
        }
        for(int j=0; j<6; ++j){
            pid_t pid=fork();
            if(pid==0){
                modify_list(shptr);
            }
            else{
                processes.push_back(pid);
            }
        }
    
        for(pid_t proc : processes){
            waitpid(proc, nullptr, 0);
        }
        cout<<"Father exits\n";
    }
    

    output:

    Process: 6762 dequeued: 0
    Process: 6763 dequeued: 1
    Process: 6764 dequeued: 2
    Process: 6765 dequeued: 3
    Process: 6766 dequeued: 4
    Process: 6767 dequeued: 5
    Father exits
    

    `