I have some code that previously looked like:
std::vector<MyObject> data;
do { // May run 100 times or more
prepare();
remaining = compute1(my_data);
remaining -= compute2(my_data);
cleanup(my_data);
} while (remaining > 0);
Where each function prepare()
, compute1()
, compute2()
and cleanup()
is somewhat compute intensive and parallelized with big for
loops with #pragma omp parallel for schedule(static)
.
Some benchmarking showed significant time spent synchronizing the threads, so I considered switching to a single parallel section and distributing to data between the threads. The code would look like:
std::vector<MyObject> data;
std::atomic<int> remaining;
#pragma omp parallel
{
std::vector<MyObject> my_data;
#pragma for schedule(static)
for (int i = 0; i < data.size(); i++) {
my_data.push_back(data[i]);
}
do { // May run 100 times or more
prepare();
remaining = 0;
remaining += compute1(my_data);
remaining -= compute2(my_data);
cleanup(my_data);
} while (remaining > 0);
}
Of course, there are some synchronization constraints to consider, the first 3 being somewhat obvious:
data
must go through the compute1()
, compute2()
and cleanup()
functions in order and before the next iteration.remaining = 0
, but no thread should run it after the first thread has run the remaining += compute1()
instruction.remaining > 0
condition before all threads have run the remaining -= compute2()
instruction.remaining = 0
instruction before all threads have tested the previous remaining > 0
condition.prepare()
instruction before all threads have run the previous remaining -= compute2()
instruction.remaining += compute1()
instruction before all threads have run the prepare()
instruction.remaining -= compute2()
instruction before all threads have run the remaining += compute1()
(that’s the reason there are 2 compute
functions).Constraint #1 is automatically satisfied since I split the items is data
into per-thread my_data
vectors.
Constraint #7 looks straightforward: a standard barrier would do it fine.
Constraint #2 is also easy to satisfy: the remaining = 0
instruction should be run with #pragma omp single
.
Of course, I can also implement the other constraints using barriers:
std::vector<MyObject> data;
std::atomic<int> remaining;
#pragma omp parallel
{
std::vector<MyObject> my_data;
#pragma for schedule(static)
for (int i = 0; i < data.size(); i++) {
my_data.push_back(data[i]);
}
do { // May run 100 times or more
prepare();
#pragma omp barrier
#pragma omp single
remaining = 0;
remaining += compute1(my_data);
#pragma omp barrier
remaining -= compute2(my_data);
cleanup(my_data);
#pragma omp barrier
} while (remaining > 0);
}
When I benchmark this, it shows some significant improvement compared to the previous version where each function were independently parallelized. The same simulation drop from 193 s to 159 s, which gives an interesting 17.7 % speed improvement. My intuition is that each thread having its own data improves the data locality and the hit rate of the local CPU/core cache.
But I think there are too many unnecessary constraints. If I had a way to have some “barriered section” that no thread can leave before all threads have entered it, I could do something much less constrained. I would be able to implement this with something like:
std::vector<MyObject> data;
std::atomic<int> remaining;
#pragma omp parallel
{
std::vector<MyObject> my_data;
#pragma for schedule(static)
for (int i = 0; i < data.size(); i++) {
my_data.push_back(data[i]);
}
do { // May run 100 times or more
#pragma omp barriered section
{
prepare();
}
#pragma omp single
remaining = 0;
remaining += compute1(my_data);
#pragma omp barrier
remaining -= compute2(my_data);
#pragma omp barriered section
{
cleanup(my_data);
}
} while (remaining > 0);
}
Is there a good way to implement such a thing using OpenMP features and/or C++ synchronization features?
I guess it should be feasible with low-level mutexes, locks and condition variables, but I’d rather not write such bug-prone code myself if there is some good well-known way to implement such a thing.
EDIT:
Thanks to Joachim’s good suggestions in the comments, I could avoid some unnecessary barriers. The code now looks like:
std::vector<MyObject> data;
std::atomic<int> remaining[2] = {1, 1};
#pragma omp parallel
{
std::vector<MyObject> my_data;
#pragma for schedule(static)
for (int i = 0; i < data.size(); i++) {
my_data.push_back(data[i]);
}
prepare();
#pragma omp barrier // In fact, there is a “#pragma omp for wait” loop in “prepare“ instead of a barrier
for (int iter = 0; remaining[(iter-1)%2] > 0; iter++) { // May run 100 times or more
int local_remaining = compute1(my_data);
#pragma omp single nowait
remaining[iter%2] = 0;
#pragma omp barrier // Not sure about the exact semantic of “#pragma omp single wait” but it might be enough…
local_remaining -= compute2(my_data);
remaining[iter%2] += local_remaining;
cleanup(my_data);
#pragma omp barrier // For constraint #5, either here or before “cleanup(my_data)”, but I’d rather use a “split barrier”/“barriered section” to relax the synchronization
prepare();
#pragma omp barrier // In fact, there is a “#pragma omp for wait” loop in “prepare“ instead of a barrier
}
}
C++20 introduced std::barrier
, which can be used in a split way, since it has separate arrive()
and wait()
functions.
The problem is how to construct a single barrier object: std::barrier
's constructor needs to get the number of threads, but you only know that once you are inside the #pragma omp parallel
region. One trick could be to use create an empty std::optional<std::barrier>
before the parallel region, and use #pragma omp single
to have a single thread create the actual std::barrier
. It could look like this:
std::optional<std::barrier<>> barrier;
#pragma omp parallel
{
#pragma omp single
barrier.emplace(omp_get_num_threads());
for (…) {
compute1(…);
auto arrive_token = barrier->arrive();
compute2(…);
barrier->wait(std::move(arrive_token));
compute3(…);
}
}