I want to leverage multiple cores for an expensive computation without copying the large data to all forked processes.
In more detail: The large data is a large (networkx) graph plus certain edges cached in a dict. The expensive computation is a complex path-finding algorithm, the input data is a list of vertices that are the sources for the expensive path-finding algorithm.
To make usage of copy-on-write (using Linux), I read here to make here that
Before you fork your child processes, prepare your big data in a module level variable (global to all the functions).
which I thought I did running on Linux: (rough sketch)
import concurrent
import multiprocessing as mp
from compute_module import expensive_graph_algorithm
def setup_graph(name):
global large_graph
large_graph = read_graph(name) # about 20 GB of graph data
def main():
setup_graph()
vertices = ...
with concurrent.futures.ProcessPoolExecutor(mp_context=mp.get_context('fork')) as executor:
futures = []
for vertex in vertices:
future = executor.submit(expensive_graph_algorithm, vertex, other_arguments)
futures += [future]
for future in concurrent.futures.as_completed(futures):
result = future.result()
# store results
But when I look at the forked processes and their memory usage, the data get copied for every process and floods my RAM. What am I doing wrong?
When ProcessPoolExecutor forks its subprocesses, they will have a view of everything at module scope. Since you created large_graph
before the subprocesses started, a function that accesses large_graph
doesn't need to have the data passed in the futures call. The trick is to write a worker that knows the convention you are using. It gets large_graph
from the module namespace and uses that to call the expensive function.
In addition to large_graph
, I created a global variable to hold verticies
. I don't know how large that data is, or whether its needed, but it shows another good way to put stuff in global data. If you have a list of objects, all you have to pass to the worker is an index to that list.
Now you just have to make sure that you don't pass global_data or verticies through the submit to the pool. The worker will find the right data for you.
import concurrent
import multiprocessing as mp
from compute_module import expensive_graph_algorithm
def setup_data(name):
global large_graph
large_graph = read_graph(name) # about 20 GB of graph data
def expensive_graph_algorithm_worker(index):
return expensive_graph_algorithm(verticies[index], large_graph)
def main():
global verticies
vertices = get_verticies_list(...)
with concurrent.futures.ProcessPoolExecutor(mp_context=mp.get_context('fork')) as executor:
futures = []
for i in range(len(vertices)):
future = executor.submit(expensive_graph_algorithm_worker, i, other_arguments)
futures += [future]
for future in concurrent.futures.as_completed(futures):
result = future.result()
# store results