I have an async function where I need to return the best result (chain with maximum score) from results (other chains and them scores) computed in async threads in boost::asio::thread_pool. As I need to return only one chain, handling other chains outside the thread pool is not required, and as I understand I just need to synchronize the place there the max_score and best_chain are computed.
#include <iostream>
#include <vector>
#include <mutex>
#include <boost/asio.hpp>
#include <boost/asio/thread_pool.hpp>
int GetBestChainAsync(const std::vector<std::vector<float>>& data)
{
std::vector<float> best_chain;
int n{5};
float max_score{};
std::mutex mutex;
boost::asio::thread_pool pool(n);
for (int i = 0; i < n; i++) {
boost::asio::post(pool, [&best_chain, &data, &max_score, &mutex, i]() {
std::vector<float> chain = GetChain(data, i);
float score = SimilarityScore(chain, data);
std::scoped_lock lock(mutex);
if (score > max_score) {
max_score = score;
best_chain = chain;
}
});
}
pool.join();
for (const auto& c: best_chain) {
std::cout << c << std::endl;
}
}
Is this logic correct? Is it possible to somehow to use the asio::strand here instead of std::mutex and simplify function?
From the key sentence "instead of std::mutex
" I get the sense that you're trying to make this lock-free.
You can use atomics, e.g. Live On Coliru
#include <atomic>
#include <boost/asio.hpp>
#include <boost/core/ignore_unused.hpp>
#include <cmath>
#include <iostream>
namespace asio = boost::asio;
using Value = float;
using Chain = std::vector<Value>;
using Chains = std::vector<Chain>;
Chain const& GetChain(Chains const& data, int i) { return data.at(i); }
Value SimilarityScore(Chain const& chain, Chains const& data) {
return 0.0; // TODO
boost::ignore_unused(chain, data);
}
int GetBestChainAsync(Chains const& data) {
struct Best {
int id = -1;
Value score = std::numeric_limits<Value>::min();
};
std::atomic<Best> best;
{
constexpr int n{5};
asio::thread_pool pool(n);
for (int i = 0; i < n; i++) {
asio::post(pool, [&data, &best, i]() {
Best const new_{i, SimilarityScore(GetChain(data, i), data)};
for (auto tmp = best.load(); new_.score > tmp.score;)
if (best.compare_exchange_strong(tmp, new_))
break;
});
}
pool.join();
}
auto result = best.load();
for (auto const& c : GetChain(data, result.id))
std::cout << c << std::endl;
return result.id;
}
It might be easier to read with atomic_ref
(c++20). I kinda like atomic<T>
because it makes it hard to forget to use atomics.
Asio doesn't change any of that. I have a strong suspicion that using parallel std::for_each (c++17) will be superior here, because you don't need any of the IO oriented services implied by Asio's execution context.
Also, depending on the nature of similarity scores you can save a lot by improving the algorithm and perhaps doing less work based on it in the first place.