c++multithreadingboost-asio

Synchronization in boost::asio::thread_pool to return the best result


I have an async function where I need to return the best result (chain with maximum score) from results (other chains and them scores) computed in async threads in boost::asio::thread_pool. As I need to return only one chain, handling other chains outside the thread pool is not required, and as I understand I just need to synchronize the place there the max_score and best_chain are computed.

#include <iostream>
#include <vector>
#include <mutex>
#include <boost/asio.hpp>
#include <boost/asio/thread_pool.hpp>

int GetBestChainAsync(const std::vector<std::vector<float>>& data)
{
  std::vector<float> best_chain;
  int n{5};
  float max_score{};
  std::mutex mutex;
  boost::asio::thread_pool pool(n);
  for (int i = 0; i < n; i++) {
    boost::asio::post(pool, [&best_chain, &data, &max_score, &mutex, i]() {
      std::vector<float> chain = GetChain(data, i);
      float score = SimilarityScore(chain, data);
      std::scoped_lock lock(mutex);
      if (score > max_score) {
        max_score = score;
        best_chain = chain;
      }
    });
  }
  pool.join();
  for (const auto& c: best_chain) {
    std::cout << c << std::endl;
  }
}

Is this logic correct? Is it possible to somehow to use the asio::strand here instead of std::mutex and simplify function?


Solution

  • From the key sentence "instead of std::mutex" I get the sense that you're trying to make this lock-free.

    You can use atomics, e.g. Live On Coliru

    #include <atomic>
    #include <boost/asio.hpp>
    #include <boost/core/ignore_unused.hpp>
    #include <cmath>
    #include <iostream>
    namespace asio = boost::asio;
    
    using Value = float;
    using Chain  = std::vector<Value>;
    using Chains = std::vector<Chain>;
    
    Chain const& GetChain(Chains const& data, int i) { return data.at(i); }
    
    Value SimilarityScore(Chain const& chain, Chains const& data) {
        return 0.0; // TODO
        boost::ignore_unused(chain, data);
    }
    
    int GetBestChainAsync(Chains const& data) {
        struct Best {
            int   id    = -1;
            Value score = std::numeric_limits<Value>::min();
        };
        std::atomic<Best> best;
    
        {
            constexpr int     n{5};
            asio::thread_pool pool(n);
    
            for (int i = 0; i < n; i++) {
                asio::post(pool, [&data, &best, i]() {
                    Best const new_{i, SimilarityScore(GetChain(data, i), data)};
    
                    for (auto tmp = best.load(); new_.score > tmp.score;)
                        if (best.compare_exchange_strong(tmp, new_))
                            break;
                });
            }
            pool.join();
        }
    
        auto result = best.load();
        for (auto const& c : GetChain(data, result.id))
            std::cout << c << std::endl;
    
        return result.id;
    }
    

    It might be easier to read with atomic_ref (c++20). I kinda like atomic<T> because it makes it hard to forget to use atomics.

    Other Thoughts

    Asio doesn't change any of that. I have a strong suspicion that using parallel std::for_each (c++17) will be superior here, because you don't need any of the IO oriented services implied by Asio's execution context.

    Also, depending on the nature of similarity scores you can save a lot by improving the algorithm and perhaps doing less work based on it in the first place.