I have a queue of jobs that should be done asynchronously (actually send HTTP requests, if it matters) and I want an executer thread to suspend until new job arrives if the queue is empty. I hope the following snippet will explain what I want:
#include <Windows.h>
#include <iostream>
#include <vector>
#include <mutex>
#include <memory>
#include <stack>
#include <queue>
class Job{
};
std::queue<Job> jobs;
std::unique_ptr<std::mutex> mutex = std::make_unique<std::mutex>();
std::vector<HANDLE> threads;
std::vector<HANDLE> threadsEvents;
std::stack<std::size_t> freeThreads;
std::atomic_bool stopRequested { false };
void DoJob(Job job = {})
{
}
void ThreadedMethod(const std::size_t threadNumber)
{
while (!stopRequested) {
mutex->lock();
if (!jobs.empty()) {
//auto job = queue.front();
//queue.pop();
mutex->unlock();
DoJob(/*std::move(job)*/);
} else {
freeThreads.push(threadNumber);
mutex->unlock(); // 1)
// Unlocking before thread suspended.
::SetEvent(threadsEvents[threadNumber]); // Notify that thread will be suspended
::SuspendThread(threads[threadNumber]); // 2)
}
}
}
void addJob(Job job){
std::lock_guard lock {*mutex};
jobs.push(job);
// activate first available free thread
// ResumeThread(freeThreads.top());
// ...
}
/*
// Creating threads
//for (std::size_t i = 0; i < 10; ++i) {
// Concidering the 'threadNumber' is stored somewhere.
// threads[i] = CreateThread(NULL, 0, ThreadedMethod, (LPVOID)&threadNumber, CREATE_SUSPENDED, NULL);
//}
*/
In this snippet the ThreadedMethod
which runs asynchronously suspend its thread and I afraid that something may happen between lines marked 1)
and 2)
, for example a new job may arrive and this can produce inconsistent state. So I'm looking a way to somehow suspend the thread and unlock mutex "at the same time".
Now I'm looking how WinApi events can be utilized to solve the problem, but I'm also interesting how to solve it using standard C++ features.
You can use semaphore primitives. Semaphores use an unsigned counter and a process queue and support 2 operations that atomically increment and decrement the counter. If the decrement of a counter would make it go below 0 then the thread calling wait()
suspends.
To perform this you need 2 semaphores, free_slots(size_queue)
, available(0)
. When a thread calls addJob
waits on the free_slots
(if queue is full then suspends) and perform post()
on the available
semaphore, signaling an eventual suspended consumer.
Note that mutex are still necessary since the queue operations must be performed sequentially.
//...
#include <semaphore.h>
//initialize the semaphore to 0
std::counting_semaphore<size_t> free_slots(size_queue);
std::counting_semaphore<size_t> available(0);
std::queue<Job> jobs;
std::unique_ptr<std::mutex> mutex = std::make_unique<std::mutex>();
void ThreadMethod(const size_t tid){
while(!stopRequeust){
Job *job;
available.wait(); //if queue is empty then wait
if(stopRequest) //exit
mutex->lock();
job = queue.front();
queue.pop();
mutex->unlock();
free_slots.post(); //signal that a slot is free
//execute job;
}
}
void addJob(Job job){
free_slots.wait(); //if queue is full then wait
mutex->lock();
jobs.push(job);
mutex->unlock();
available.post(); //signal that a job is available
}
You can get similar results using condition variables.