c++winapi

How to simultaneously suspend thread and unlock mutex from this thread


I have a queue of jobs that should be done asynchronously (actually send HTTP requests, if it matters) and I want an executer thread to suspend until new job arrives if the queue is empty. I hope the following snippet will explain what I want:

#include <Windows.h>

#include <iostream>
#include <vector>
#include <mutex>
#include <memory>
#include <stack>
#include <queue>

class Job{
};

std::queue<Job> jobs;
std::unique_ptr<std::mutex> mutex = std::make_unique<std::mutex>();
std::vector<HANDLE>         threads;
std::vector<HANDLE>         threadsEvents;
std::stack<std::size_t>     freeThreads;
std::atomic_bool            stopRequested { false };

void DoJob(Job job = {})
{
}

void ThreadedMethod(const std::size_t threadNumber)
{
    while (!stopRequested) {
        mutex->lock();
        if (!jobs.empty()) {
            //auto job = queue.front();
            //queue.pop();
            mutex->unlock();
            DoJob(/*std::move(job)*/);
        } else {
            freeThreads.push(threadNumber);
            mutex->unlock(); // 1)
            // Unlocking before thread suspended.
            ::SetEvent(threadsEvents[threadNumber]); // Notify that thread will be suspended
            ::SuspendThread(threads[threadNumber]); // 2)
        }
    }
}

void addJob(Job job){
    std::lock_guard lock {*mutex};
    jobs.push(job);
    // activate first available free thread
    // ResumeThread(freeThreads.top());
    // ...
}

/*
// Creating threads
//for (std::size_t i = 0; i < 10; ++i) {
//    Concidering the 'threadNumber' is stored somewhere.
//    threads[i] = CreateThread(NULL, 0, ThreadedMethod, (LPVOID)&threadNumber, CREATE_SUSPENDED, NULL);
//}
*/

In this snippet the ThreadedMethod which runs asynchronously suspend its thread and I afraid that something may happen between lines marked 1) and 2), for example a new job may arrive and this can produce inconsistent state. So I'm looking a way to somehow suspend the thread and unlock mutex "at the same time".

Now I'm looking how WinApi events can be utilized to solve the problem, but I'm also interesting how to solve it using standard C++ features.


Solution

  • You can use semaphore primitives. Semaphores use an unsigned counter and a process queue and support 2 operations that atomically increment and decrement the counter. If the decrement of a counter would make it go below 0 then the thread calling wait() suspends.

    To perform this you need 2 semaphores, free_slots(size_queue), available(0). When a thread calls addJob waits on the free_slots (if queue is full then suspends) and perform post() on the available semaphore, signaling an eventual suspended consumer.

    Note that mutex are still necessary since the queue operations must be performed sequentially.

    //...
    #include <semaphore.h>
    
    //initialize the semaphore to 0
    std::counting_semaphore<size_t> free_slots(size_queue);
    std::counting_semaphore<size_t> available(0);
    std::queue<Job> jobs;
    std::unique_ptr<std::mutex> mutex = std::make_unique<std::mutex>();
    
    
    void ThreadMethod(const size_t tid){
        while(!stopRequeust){
            Job *job;
            available.wait();   //if queue is empty then wait
                if(stopRequest) //exit
                mutex->lock();
                    job = queue.front();
                    queue.pop();
                mutex->unlock();
            free_slots.post();  //signal that a slot is free
            //execute job;
        }
    }
    
    void addJob(Job job){
        free_slots.wait();  //if queue is full then wait
            mutex->lock();
                jobs.push(job);
            mutex->unlock();
        available.post();   //signal that a job is available
    }
    

    You can get similar results using condition variables.