I want a main thread which can give values to a number of subtasks (where the subtasks takes long time to execute) and where I do not want to end the subtasks and restart them with new values. I try to use promise and future, but going this route is going to be complicated, since the promise and futures is not "reusable" or in fact I can not do two future.get() in a row...
Here is a code example, but this is not executing since the two future.get() in the print_int-task will run directly after each other!
#include <iostream> // std::cout
#include <functional> // std::ref
#include <thread> // std::thread
#include <future> // std::promise, std::future
void print_int(std::future<int>& input_future, std::promise<void>& done_promise)
{
std::cout << "Print thread starts!" << std::endl;
int x = input_future.get();
while (x > 0)
{
std::cout << "Recived value: " << x;
done_promise.set_value();
std::cout << " done is set" << std::endl;
int x = input_future.get();
}
}
int main()
{
std::cout << "Main thread starts!" << std::endl;
std::promise<int> input_prom;
std::promise<void> done_promise;
std::future<void> done_future = done_promise.get_future();
std::future<int> input_fut = input_prom.get_future();
std::thread thred(print_int, std::ref(input_fut), std::ref(done_promise));
int counter = 10;
while (counter > 1)
{
input_prom.set_value(counter); // fulfill promise
//the print_int thread excecutes and prints
done_future.get(); // wait until its printed
counter--;
}
// (synchronizes with getting the future)
thred.join();
return 0;
}
this is a typical producer-consumer problem, you should send a new future for every task you need to execute, the "sending" should be done with a thread-safe queue, and the "returning" should be done using the promise as follows.
#include <iostream> // std::cout
#include <functional> // std::ref
#include <thread> // std::thread
#include <future> // std::promise, std::future
#include <queue>
#include <mutex>
#include <optional>
class SPSCQueue
{
public:
using OptionalPromisePair = std::optional<std::pair<int, std::promise<std::string>>>;
void put(OptionalPromisePair&& p)
{
{
std::lock_guard<std::mutex> lk(m_mut);
m_queue.push(std::move(p));
}
m_cv.notify_one();
}
OptionalPromisePair get()
{
if (this->m_queue.size() == 0)
{
std::unique_lock lk(m_mut);
m_cv.wait(lk, [this]{ return this->m_queue.size() != 0; });
}
std::lock_guard lk2(m_mut);
OptionalPromisePair ret = std::move(m_queue.front());
m_queue.pop();
return ret;
}
private:
std::queue<OptionalPromisePair> m_queue;
std::mutex m_mut;
std::condition_variable m_cv;
};
void print_int(SPSCQueue& queue)
{
std::cout << "Print thread starts!" << std::endl;
while (true)
{
SPSCQueue::OptionalPromisePair p = queue.get();
if (!p)
{
return;
}
std::cout << "Recived value: " << (*p).first;
(*p).second.set_value("val");
std::cout << " future is set" << std::endl;
}
}
int main()
{
std::cout << "Main thread starts!" << std::endl;
SPSCQueue queue;
std::thread thred(print_int, std::ref(queue));
int counter = 10;
while (counter > 1)
{
std::promise<std::string> promise;
std::future<std::string> done_future = promise.get_future();
queue.put({{counter, std::move(promise)}});
//the print_int thread excecutes and prints
std::string returned_val = std::move(done_future.get()); // wait until its printed
std::cout << "received " << returned_val << " in main thread\n";
counter--;
}
queue.put({}); // signal thread termination
thred.join();
return 0;
}
you can find other implementations of a thread-safe multiple consumers queue, or a non-blocking SPSCQueue, this bare-bones implementation is just for illustration.
also if you don't intend on returning a string you should use a smaller type than std::string
, maybe an int enum
, and you could wrap the thread and the queue in an RAII object that will signal termination and join the thread in the destructor to have exception safety.