c++boostboost-interprocess

boost::interprocess how to implement a simple thread safe job queue for worker processes


I'm attempting to create a basic system for taking jobs from a queue between processes with boost interprocess communications on Windows. When a worker process is free, it will take a job from the shared queue area. The code is loosely copied from examples in the documentation.

I have a child process that attempts to take on jobs from a queue stored in shared memory as Jobs. The issue is that it crashes as soon as the child attempts to read the front of the queue in SafeQueue::next() at elem = q.front(); (commented below). The child process will terminate when the queue is empty (when it returns -999).

I feel like I'm doing something horribly wrong. I'm new to Boost IPC and would appreciate any pointers or advice on how to achieve this simple worker queue system.

#include <boost/interprocess/windows_shared_memory.hpp>
#include <boost/interprocess/managed_windows_shared_memory.hpp>
#include <boost/interprocess/smart_ptr/shared_ptr.hpp>
#include <boost/interprocess/shared_memory_object.hpp>
#include <string>
#include <thread>
#include <iostream>
#include <mutex>
#include <queue>

using namespace boost::interprocess;


class SafeQueue {
    std::queue<int> q;
    std::mutex m;
public:
    SafeQueue() {}

    void push(int elem) {
        m.lock();
        q.push(elem);
        m.unlock();
    }

    void push(std::vector<int> elem) {
        m.lock();
        for (int e : elem) {
            q.push(e);
        }
        m.unlock();
    }

    int next() {
        int elem = -999;
        m.lock();
        if (!q.empty()) {
            elem = q.front(); //crashes here
            q.pop();
        }
        m.unlock();
        return elem;
    }
};


class Jobs
{
public:
    SafeQueue queue;
};

typedef managed_shared_ptr<Jobs, managed_windows_shared_memory>::type my_shared_ptr;

int main(int argc, char* argv[])
{

    if (argc == 1) {  //Parent process
        std::cout << "starting as parent" << std::endl;
        managed_windows_shared_memory segment(create_only, "MySharedMemory", 4096);

        my_shared_ptr sh_ptr = make_managed_shared_ptr(segment.construct<Jobs>("object to share")(), segment);
        sh_ptr->queue.push({1, 2, 3});
        std::string command = "\"" + std::string(argv[0]) + "\"";
        command += " child ";

        std::thread t([](const std::string& command) {
            std::system(command.c_str());
            }, command);
        while (true) {
            
        }
    }
    else {
        std::cout << "starting as child" << std::endl;

        //Open already created shared memory object.
        managed_windows_shared_memory shm(open_only, "MySharedMemory");
        Jobs* shared_job_list = shm.find<Jobs>("object to share").first;
        
        std::vector<int> taken;
        
        while (true) {
            int result;
            if ((result = shared_job_list->queue.next()) != -999) {
                taken.push_back(result);
                std::cout << "took job " << result << std::endl;
                continue;
            }
            break;
        }
        std::string out = "taken jobs: ";
        for (int res : taken) {
            out += ", " + res;
        }
        std::cout << out << std::endl;
        return 0;
    }
    return 0;
}

Solution

  • The internal data of the shared Jobs must be pointer-free to work with multiple processes. But it is not because it contains std::queue . The pointers inside will not work across multiple processes.