pythonforkpython-multiprocessingcopy-on-write

Copy-on-write for large data in Python's ProcessPoolExecutor


I want to leverage multiple cores for an expensive computation without copying the large data to all forked processes.

In more detail: The large data is a large (networkx) graph plus certain edges cached in a dict. The expensive computation is a complex path-finding algorithm, the input data is a list of vertices that are the sources for the expensive path-finding algorithm.

To make usage of copy-on-write (using Linux), I read here to make here that

Before you fork your child processes, prepare your big data in a module level variable (global to all the functions).

which I thought I did running on Linux: (rough sketch)

import concurrent
import multiprocessing as mp

from compute_module import expensive_graph_algorithm

def setup_graph(name):
    global large_graph 
    large_graph = read_graph(name) # about 20 GB of graph data

def main():
    setup_graph()
    vertices = ...
    with concurrent.futures.ProcessPoolExecutor(mp_context=mp.get_context('fork')) as executor:
        futures = []
        for vertex in vertices:
            future = executor.submit(expensive_graph_algorithm, vertex, other_arguments)
            futures += [future]

        for future in concurrent.futures.as_completed(futures):
            result = future.result()
            # store results

But when I look at the forked processes and their memory usage, the data get copied for every process and floods my RAM. What am I doing wrong?


Solution

  • When ProcessPoolExecutor forks its subprocesses, they will have a view of everything at module scope. Since you created large_graph before the subprocesses started, a function that accesses large_graph doesn't need to have the data passed in the futures call. The trick is to write a worker that knows the convention you are using. It gets large_graph from the module namespace and uses that to call the expensive function.

    In addition to large_graph, I created a global variable to hold verticies. I don't know how large that data is, or whether its needed, but it shows another good way to put stuff in global data. If you have a list of objects, all you have to pass to the worker is an index to that list.

    Now you just have to make sure that you don't pass global_data or verticies through the submit to the pool. The worker will find the right data for you.

    import concurrent
    import multiprocessing as mp
    
    from compute_module import expensive_graph_algorithm
    
    def setup_data(name):
        global large_graph 
        large_graph = read_graph(name) # about 20 GB of graph data
    
    def expensive_graph_algorithm_worker(index):
        return expensive_graph_algorithm(verticies[index], large_graph)
        
    def main():
        global verticies
        vertices = get_verticies_list(...)
        with concurrent.futures.ProcessPoolExecutor(mp_context=mp.get_context('fork')) as executor:
            futures = []
            for i in range(len(vertices)):
                future = executor.submit(expensive_graph_algorithm_worker, i, other_arguments)
                futures += [future]
    
            for future in concurrent.futures.as_completed(futures):
                result = future.result()
                # store results