I am learning about C++ std::thread
API and I am writing a naive parallel implementation of std::accumulate
. The idea of the exercise is simply to build a toy example, and become more proficient at writing multi-threaded code.
std::accumulate
in the standard template library properly deduces types, without any type hints. Do you guys have any suggestions to improve my code, so I can achieve something similar?C:\Data\dev\parallel_accumulate\include\parallel_accumulate.h(54,38): error C2440:'<function-style-cast>':
cannot convert from 'initializer list' to 'std::thread'
[C:\Data\dev\parallel_accumulate\build\parallel_accumulate.vcxproj]
./include/parallel_accumulate.h
#ifndef PARALLEL_ACCUMULATE_H
#define PARALLEL_ACCUMULATE_H
#include <numeric>
#include <functional>
#include <iterator>
#include <thread>
#include <vector>
template <typename T, typename Iterator, typename Container>
T sequential_fold(Iterator first, Iterator last, T init, T& result)
{
result = std::accumulate(first, last, init);
};
template <typename T_elem, typename T_accum, typename Iterator, typename Container>
void sequential_fold(Iterator first, Iterator last, T_accum init, std::function<T_accum(T_elem, T_accum)> func, T_accum& result)
{
result = std::accumulate(first, last, init, func);
};
template <typename T_elem, typename T_accum, typename Iterator, typename Container>
T_accum parallel_accumulate(Iterator first, Iterator last, T_elem init, std::function<T_accum(T_elem, T_accum)> func)
{
unsigned long const length = std::distance(first, last);
if (!length)
return init;
unsigned const int min_block_size = 25;
// This bit is a clever piece of code. To calculate the maximum number of threads that should be spawned
// you can divide the total number of elements (length) / by the minimum block size. If the container
// has #elements in the interval [1,min_block_size), then atleast 1 thread will be spawned.
unsigned long const max_threads = static_cast<unsigned long const>((length + min_block_size - 1)/min_block_size);
//The actual number of threads to run is the minimum of your calculated maximum and the
//number of hardware threads. You don't want to run more threads than the hardware can support(oversubscription),
//because context switching will mean that more threads will decrease the performance.
// If the call to std::threads::hardware_concurrency() returned 0, you'd substitute a number of
// your choice; in this case, I have chosen 2. You don't want to run too many threads,
// because that would slow things down on a single-core machine, but likewise you don't want
// to run too few threads, you'd be passing up on available concurrency.
unsigned long const hardware_threads = std::thread::hardware_concurrency();
unsigned long const num_threads = std::min(hardware_threads == 0 ? 2 : hardware_threads, max_threads);
unsigned long const block_size = static_cast<unsigned long const>(length / num_threads);
// Now, that we know how many threads we have, we can create a vector<T_accum> object for the intermediate
// results and a std::vector<std::thread> for the threads.
std::vector<T_accum> results(num_threads);
std::vector<std::thread> threads;
Iterator it = first;
for(int i{0};i<num_threads; ++i)
{
threads.push_back(std::thread(sequential_fold, it, it + block_size, func, std::ref(results[i]))) ;
it += block_size;
}
for(auto& t : threads)
t.join();
return std::accumulate(results.begin(), results.end(), init, func);
}
#endif
./src/main.cpp
#include <iostream>
#include "parallel_accumulate.h"
#include <vector>
int main()
{
std::vector<double> v(50, 1);
std::function<double(double, double)> sum = [](double element, double accum){
return accum + element;
};
auto it = v.begin();
double result = parallel_accumulate<double, double, std::vector<double>::iterator, std::vector<double>>(v.begin(), v.end(), 0.0, sum);
return 0;
}
./CMakeLists.txt
cmake_minimum_required(VERSION 3.30.3)
project(parallel_accumulate)
set(CMAKE_CXX_STANDARD 20)
set(CMAKE_CXX_STANDARD_REQUIRED True)
set(SOURCES
src/main.cpp
)
include_directories(include)
add_executable(parallel_accumulate ${SOURCES})
You need to instantiate sequential_fold
before providing to the std::thread
, i.e.:
threads.push_back (std::thread (
sequential_fold<T_elem, T_accum, Iterator, void>,
it,
it + block_size,
T_accum{}, // provide an init value
func,
std::ref(results[i])
));
By the way, don't forget to provide an argument for the init
parameter of sequential_fold
. And also the last template parameter Container
doesn't seem to be used.