I'm trying to parallelize code that should be embarrassingly parallel and it just seems to get slower the more processes I use. Here is a minimally (dys)functional example:
import os
import time
import random
import multiprocessing
from multiprocessing import Pool, Manager, Process
import numpy as np
import pandas as pd
def pool_func(
number: int,
max_number: int
) -> dict:
pid = str(multiprocessing.current_process().pid)
print('[{:2d}/{:2d} {:s}] Starting ...'.format(number, max_number, pid))
t0 = time.time()
# # the following takes ~10 seconds on a separate node
# for i in range(2):
# print('[{:d}] Passed loop {:d}/2...'.format(number, i+1))
# time.sleep(5)
# the following takes ~3.3 seconds on a separate node
n = 1000
for _ in range(50):
u = np.random.randn(n, n)
v = np.linalg.inv(u)
t1 = time.time()
print('[{:2d}/{:2d} {:s}] Finished in {:.1f} seconds.'.format(number, max_number, pid, t1 - t0))
return {}
if __name__ == "__main__":
runs = []
count = 0
while count < 50:
runs.append(
(count, 50)
)
count += 1
print(f"Number of runs to perform: {len(runs):d}")
num_cpus = 4
print(f"Running job with {num_cpus:d} CPUs in parallel ...")
# with Pool(processes=num_cpus) as pool:
with multiprocessing.get_context("spawn").Pool(processes=num_cpus) as pool:
results = pool.starmap(pool_func, runs)
print('Main process done.')
There are three features I want to point out. First, num_cpus
can be changed to increase the number of workers in the pool. Second, I can change from the default 'fork' pool to the 'spawn' method, this doesn't seem to change anything. Finally, inside pool_func, the process that is running can be either a CPU intensive matrix inversion or a CPU-absent wait function.
When I use the wait function, the processes run in approximately the correct time, about 10 seconds per process. When I use the matrix inversion, the process time increases with the number of processes in the following, approximate, way:
1 CPU : 3 seconds
2 CPUs: 4 seconds
4 CPUs: 30 seconds
8 CPUs: 95 seconds
Here is a partial output of the script above, run as-is:
Number of runs to perform: 50
Running job with 4 CPUs in parallel ...
[ 0/50 581194] Starting ...
[ 4/50 581193] Starting ...
[ 8/50 581192] Starting ...
[12/50 581191] Starting ...
[ 0/50 581194] Finished in 24.7 seconds.
[ 1/50 581194] Starting ...
[ 4/50 581193] Finished in 29.3 seconds.
[ 5/50 581193] Starting ...
[12/50 581191] Finished in 30.3 seconds.
[13/50 581191] Starting ...
[ 8/50 581192] Finished in 32.2 seconds.
[ 9/50 581192] Starting ...
[ 1/50 581194] Finished in 26.9 seconds.
[ 2/50 581194] Starting ...
[ 5/50 581193] Finished in 30.3 seconds.
[ 6/50 581193] Starting ...
[13/50 581191] Finished in 30.8 seconds.
[14/50 581191] Starting ...
[ 9/50 581192] Finished in 32.8 seconds.
[10/50 581192] Starting ...
...
The process ids look unique to me.
Clearly, there is some problem with scaling as adding more CPUs is causing the processes to run slower. There isn't any I/O at all in the processes that are being timed. These are pedestrian processes that I expected would work right out of the box. I have no idea why this is not working as I think it should. Why does this script have individual processes that take longer when I use more CPUs?
When I run this on my macos laptop, it works as expected. But has similar scaling issues on a different remote linux computer I have access to. This might be a platform specific problem, but I'll leave it up in case someone has seen it before and knows how to fix it.
One thing that can cause this kind of problem is nested parallelism, when each process in your process pool starts multiple threads to speed up operations within that process.
You can investigate whether this is happening is by looking at one minute load average. A program like htop
can show you this. Run your program with varying process counts. If there one thread per process, and then you get load average close to the number of processes after the program has run for a minute or so. If you have many threads per process, then you can get load averages which are much higher than the number of processes.
If this happens, you can frequently get better results by limiting or eliminating NumPy parallelism, so that the number of threads times the number of processes is equal to your number of CPU cores.
There are two ways you can do this.
threadpoolctl can adjust thread count in NumPy. You can do this once at process startup, or limit parallelism only in sections that you know won't benefit from parallelism.
Example:
from threadpoolctl import threadpool_limits
import numpy as np
with threadpool_limits(limits=1, user_api='blas'):
# In this block, calls to blas implementation (like openblas or MKL)
# will be limited to use only one thread. They can thus be used jointly
# with thread-parallelism.
a = np.random.randn(1000, 1000)
a_squared = a @ a
Be aware that these limits are not shared between processes, so you'll need to call threadpool_limits()
in the subprocess, not the parent process.
Most BLAS libraries will read configuration for the number of threads from an environment variable, and limit their thread pool accordingly. You can see a common list of environment variables here.
I am not a big fan of this approach, for two reasons. The first reason is that this depends on library load order. The environment variable needs to be set before NumPy is imported. The second reason is that it doesn't allow you to dynamically set threadpool size, which makes it harder to make targeted, small fixes.