c++concurrencystd-future

std::future get() blocks when wait_for() status is ready and wait() returns


I expect the below code to pass all assertions and complete successfully every time. Currently it seems std::future.get() blocks in both branches everytime. It blocks forever despite wait_for() showing the status as ready and wait() returning immediately. Same result for gcc 7.4.0 & clang 6.0.0.

#include <chrono>
#include <condition_variable>
#include <future>
#include <functional>
#include <iostream>
#include <mutex>
#include <queue>

#include <cassert>
#include <unistd.h>


template<class T>
class BlockingQueue {
  std::queue<T> theQueue;
  std::mutex mtx;
  std::condition_variable hasDataCondition;

public:
  void push(const T& t) {
    std::unique_lock<std::mutex> lock{mtx};
    theQueue.push(t);
    hasDataCondition.notify_all();
  }

  T popWhenAvailable(int i = 0) {
    std::unique_lock<std::mutex> lock{mtx};
    if (theQueue.empty()) {
      std::cout << "Waiting " << i << std::endl;
      hasDataCondition.wait(lock, [this]{return not theQueue.empty();});
      std::cout << "Done waiting " << i << std::endl;
    }
    T front = std::move(theQueue.front());
    theQueue.pop();
    std::cout << "Got value " << front << " and popped it on " << i << std::endl;
    return front;
  }
};

int main(int argc, char** argv) {
  BlockingQueue<int> q;

  auto futureInt0 = std::async(std::launch::async, [&]{return q.popWhenAvailable();});
  auto futureInt1 = std::async(std::launch::async, [&]{return q.popWhenAvailable(1);});
  std::cout << "Starting threads..." << std::endl;
  sleep(2);

  assert(futureInt0.wait_for(std::chrono::milliseconds(300)) != std::future_status::ready);
  assert(futureInt1.wait_for(std::chrono::milliseconds(300)) != std::future_status::ready);

  std::cout << "Pushing data..." << std::endl;
  q.push(4);
  std::cout << "Pushed! Checking results..." << std::endl;

  if (futureInt0.wait_for(std::chrono::milliseconds(300)) == std::future_status::ready) {
    std::cout << "Future 0 ready." << std::endl;
    assert(futureInt1.wait_for(std::chrono::milliseconds(300)) != std::future_status::ready);
    std::cout << "Future 1 isn't ready (it shouldn't be)." << std::endl;

    std::cout << "Trying to wait() for future 0, should return immediately..." << std::endl;
    futureInt0.wait();
    std::cout << "Now get() the value..." << std::endl;
    assert(futureInt0.get() == 4);
  } else {
    std::cout << "Future 0 not ready. Trying future 1..." << std::endl;
    assert(futureInt1.wait_for(std::chrono::milliseconds(300)) == std::future_status::ready);

    std::cout << "Future1 status is ready. Trying to wait(), should return immediately..." << std::endl;
    futureInt1.wait();
    std::cout << "Now get() the value..." << std::endl;
    assert(futureInt1.get() == 4);
  }
}

Solution

  • Interesting! The first thing I found was that you are waiting on the second thread to have something to pop as pointed out by @rafix07. I'm not sure what the ultimate objective is but this works. I tested on MSVC and here it is with g++ on Coliru

    #include <chrono>
    #include <condition_variable>
    #include <future>
    #include <functional>
    #include <iostream>
    #include <mutex>
    #include <queue>
    #include <thread>
    #include <cassert>
    
    
    template<class T>
    class BlockingQueue {
        std::queue<T> theQueue;
        std::mutex mtx;
        std::condition_variable hasDataCondition;
    
    public:
        void push(const T& t) {
            std::unique_lock<std::mutex> lock{ mtx };
            theQueue.push(t);
            hasDataCondition.notify_all();
        }
    
        T popWhenAvailable(int i) {
            std::unique_lock<std::mutex> lock{ mtx };
            std::cout << "popWhenAvailable: " << i << std::endl;
            if (theQueue.empty()) {
                std::cout << "Waiting " << i << std::endl;
                hasDataCondition.wait(lock, [this] {return ! theQueue.empty(); });
                std::cout << "Done waiting " << i << std::endl;
            }
            T front = std::move(theQueue.front());
            theQueue.pop();
            std::cout << "Got value " << front << " and popped it on " << i << std::endl;
            return front;
        }
    };
    
    int main(int argc, char** argv) {
        using namespace std::chrono_literals;
        BlockingQueue<int> q;
    
        auto futureInt0 = std::async(std::launch::async, [&] {return q.popWhenAvailable(0); });
        auto futureInt1 = std::async(std::launch::async, [&] {return q.popWhenAvailable(1); });
        std::cout << "Starting threads...\n" << std::endl;
        std::this_thread::sleep_for(1000ms);
    
        assert(futureInt0.wait_for(std::chrono::milliseconds(300)) != std::future_status::ready);
        assert(futureInt1.wait_for(std::chrono::milliseconds(300)) != std::future_status::ready);
    
        std::cout << "Pushing data..." << std::endl;
        q.push(4);
        std::cout << "Pushed! Checking results..." << std::endl;
    
        std::pair<bool, bool> done = { false,false };
        for (;;) {
            if (!done.first && futureInt0.wait_for(std::chrono::milliseconds(300)) == std::future_status::ready) {
                std::cout << "Future 0 ready." << std::endl;
                futureInt0.wait();
                std::cout << "Now get() the value 0: " << futureInt0.get() << std::endl;
                done.first = true;
            }
            else if(!done.second && futureInt1.wait_for(std::chrono::milliseconds(300)) == std::future_status::ready) {
                std::cout << "Future 1 ready." << std::endl;
                futureInt1.wait();
                std::cout << "Now get() the value 1: " << futureInt1.get() << std::endl;
                done.second = true;
            }
            if (done.first && done.second)
                break;
            else if(done.first || done.second)
                q.push(8);
        }
    }