multithreadingc++11producer-consumer

Multithreaded Producer/Consumer in C++


I am looking at multithreading and written a basic producer/consumer. I have two issues with the producer/consumer written below. 1) Even by setting the consumer sleep time lower than the producer sleep time, the producer still seems to execute quicker. 2) In the consumer I have duplicated the code in the case where the producer finishes adding to the queue, but there is still elements in the queue. Any advise for a better way of structuring the code?

#include <iostream>
#include <queue>
#include <mutex>

class App {
private:
    std::queue<int> m_data;
    bool m_bFinished;
    std::mutex m_Mutex;
    int m_ConsumerSleep;
    int m_ProducerSleep;
    int m_QueueSize;
public:
    App(int &MaxQueue) :m_bFinished(false), m_ConsumerSleep(1), m_ProducerSleep(5), m_QueueSize(MaxQueue){}
    void Producer() {

        for (int i = 0; i < m_QueueSize; ++i) {
            std::lock_guard<std::mutex> guard(m_Mutex);
            m_data.push(i); 
            std::cout << "Producer Thread, queue size: " << m_data.size() << std::endl;
            std::this_thread::sleep_for(std::chrono::seconds(m_ProducerSleep));
        }
        m_bFinished = true;
    }

    void Consumer() {
        while (!m_bFinished) {
            if (m_data.size() > 0) {
                std::lock_guard<std::mutex> guard(m_Mutex);
                std::cout << "Consumer Thread, queue element: " << m_data.front() << " size: " << m_data.size() << std::endl;
                m_data.pop();
            }
            else {
                std::cout << "No elements, skipping" << std::endl;
            }
            std::this_thread::sleep_for(std::chrono::seconds(m_ConsumerSleep));
        }
        while (m_data.size() > 0) {
            std::lock_guard<std::mutex> guard(m_Mutex);
            std::cout << "Emptying remaining elements " << m_data.front() << std::endl;
            m_data.pop();
            std::this_thread::sleep_for(std::chrono::seconds(m_ConsumerSleep));
        }
    }

};


int main()
{
    int QueueElements = 10;
    App app(QueueElements);
    std::thread consumer_thread(&App::Consumer, &app);
    std::thread producer_thread(&App::Producer, &app);

    producer_thread.join();
    consumer_thread.join();


    std::cout << "loop exited" << std::endl;
    return 0;
}

Solution

  • First, you should use a condition variable instead of a delay on the consumer. This way, the consumer thread only wakes up when the queue is not empty and the producer notifies it.

    That said, the reason why your producer calls are more frequent is the delay on the producer thread. It's executed while holding the mutex, so the consumer will never execute until the delay is over. You should release the mutex before calling sleep_for:

    for (int i = 0; i < m_QueueSize; ++i) {
                /* Introduce a scope to release the mutex before sleeping*/
                {
                   std::lock_guard<std::mutex> guard(m_Mutex);
                    m_data.push(i); 
                    std::cout << "Producer Thread, queue size: " << m_data.size() << std::endl;
                } // Mutex is released here
                std::this_thread::sleep_for(std::chrono::seconds(m_ProducerSleep));
            }