I'm attempting to create a basic system for taking jobs from a queue between processes with boost interprocess communications on Windows. When a worker process is free, it will take a job from the shared queue area. The code is loosely copied from examples in the documentation.
I have a child process that attempts to take on jobs from a queue stored in shared memory as Jobs
. The issue is that it crashes as soon as the child attempts to read the front of the queue in SafeQueue::next()
at elem = q.front();
(commented below). The child process will terminate when the queue is empty (when it returns -999).
I feel like I'm doing something horribly wrong. I'm new to Boost IPC and would appreciate any pointers or advice on how to achieve this simple worker queue system.
#include <boost/interprocess/windows_shared_memory.hpp>
#include <boost/interprocess/managed_windows_shared_memory.hpp>
#include <boost/interprocess/smart_ptr/shared_ptr.hpp>
#include <boost/interprocess/shared_memory_object.hpp>
#include <string>
#include <thread>
#include <iostream>
#include <mutex>
#include <queue>
using namespace boost::interprocess;
class SafeQueue {
std::queue<int> q;
std::mutex m;
public:
SafeQueue() {}
void push(int elem) {
m.lock();
q.push(elem);
m.unlock();
}
void push(std::vector<int> elem) {
m.lock();
for (int e : elem) {
q.push(e);
}
m.unlock();
}
int next() {
int elem = -999;
m.lock();
if (!q.empty()) {
elem = q.front(); //crashes here
q.pop();
}
m.unlock();
return elem;
}
};
class Jobs
{
public:
SafeQueue queue;
};
typedef managed_shared_ptr<Jobs, managed_windows_shared_memory>::type my_shared_ptr;
int main(int argc, char* argv[])
{
if (argc == 1) { //Parent process
std::cout << "starting as parent" << std::endl;
managed_windows_shared_memory segment(create_only, "MySharedMemory", 4096);
my_shared_ptr sh_ptr = make_managed_shared_ptr(segment.construct<Jobs>("object to share")(), segment);
sh_ptr->queue.push({1, 2, 3});
std::string command = "\"" + std::string(argv[0]) + "\"";
command += " child ";
std::thread t([](const std::string& command) {
std::system(command.c_str());
}, command);
while (true) {
}
}
else {
std::cout << "starting as child" << std::endl;
//Open already created shared memory object.
managed_windows_shared_memory shm(open_only, "MySharedMemory");
Jobs* shared_job_list = shm.find<Jobs>("object to share").first;
std::vector<int> taken;
while (true) {
int result;
if ((result = shared_job_list->queue.next()) != -999) {
taken.push_back(result);
std::cout << "took job " << result << std::endl;
continue;
}
break;
}
std::string out = "taken jobs: ";
for (int res : taken) {
out += ", " + res;
}
std::cout << out << std::endl;
return 0;
}
return 0;
}
The internal data of the shared Jobs must be pointer-free to work with multiple processes. But it is not because it contains std::queue . The pointers inside will not work across multiple processes.