pythonmultiprocessingconcurrent.futuresprocess-pool

python concurrent.futures.ProcessPoolExecutor crashing with full RAM


Python concurrent.futures.ProcessPoolExecutor crashing with full RAM

Program description

Hi, I've got a computationally heavy function which I want to run in parallel. The function is a test that accepts as inputs:

The return value is a short list of calculation results.

I want to run the same function in a for loop with different parameters and the same input DataFrame, basically run a brute-force to find optimal parameters for my problem.

The code I've written

I currently am running the code concurrently with ProcessPoolExecutor from the module concurrent.futures.

import concurrent.futures
from itertools import repeat
import pandas as pd

from my_tests import func


parameters = [
    (arg1, arg2, arg3),
    (arg1, arg2, arg3),
    ...
]
large_df = pd.read_csv(csv_path)

with concurrent.futures.ProcessPoolExecutor() as executor:
    for future in executor.map(func, repeat(large_df.copy()), parameters):
        test_result = future.result()
        ...

The problem

The problem I face is that I need to run a large amount of iterations, but my program crashes almost instantly.

In on order for it not to crash, I need to limit it to max 4 workers, which is 1/4 of my CPU resources.

with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
    ...

I figured out my program crashes due to a full RAM (16 GB). What I found weird is that when I was running it on more workers, it was gradually eating more and more RAM, which it never released, until it crashed.

Instead of passing a copy of the DataFrame, I tried to pass the file path, but apart of slowing down my program, it didn't change anything.

Do you have any idea of why that problem occurs and how to solve it?


Solution

  • See my comment on what map actually returns.

    This answer is relevant according to how large your parameters list is, i.e. how many total tasks are being placed on the multiprocessing pool's task queue:

    You are currently creating and passing a copy of your dataframe (with large_df.copy()) every time you are submitting a new task (one task for each element of parameters. One thing you can do is to initialize your pool processes once with a single copy per pool process that will be used by every task submitted and executed by the pool process. The assumption is that the dataframe itself is not modified by my_tests.func. If it is modified and you need a copy of the original large_df for each new task, the function worker (see below) can make the copy. In this case you would need 2 * N copies (instead of just N copies) to exist simultaneously where N is the number of processes in the pool. This will save you memory if the length of parameters is greater than that since in your code a copy of the dataframe will exist either on the task queue or in a pool process's address space.

    If you are running under a platform such as Linux that uses the fork method to create new processes, then each child process will inherit a copy automatcally as a global variable:

    import concurrent.futures
    import pandas as pd
    
    from my_tests import func
    
    
    parameters = [
        (arg1, arg2, arg3),
        (arg1, arg2, arg3),
        ...
    ]
    
    large_df = pd.read_csv(csv_path) # will be inherited
    
    def worker(parameter):
        return func(large_df, parameter)
        """
        # or:
        return func(large_df.copy(), parameter)
        """
    
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for result in executor.map(worker, parameters):
            ...
    

    my_tests.func is expecting as its first argument a dataframe, but with the above change the dataframe is no longer being passed; the dataframe is now accessed as a global variable. So without modifying func, we need am adapter function, worker, that will pass to func what it is expecting. Of course, if you are able to modify func, then you can do without the adapter.

    If you were running on a platform such as Windows that uses the spawn method to create new processes, then:

    import concurrent.futures
    import pandas as pd
    
    from my_tests import func
    
    def init_pool_processes(df):
        global large_df
        large_df = df
    
    
    def worker(parameter):
        return func(large_df, parameter)
        """
        # or:
        return func(large_df.copy(), parameter)
        """
    
    if __name__ == '__main__':
        
        parameters = [
            (arg1, arg2, arg3),
            (arg1, arg2, arg3),
            ...
        ]
        
        large_df = pd.read_csv(csv_path) # will be inherited
        
        with concurrent.futures.ProcessPoolExecutor(initializer=init_pool_processes, initargs=(large_df,)) as executor:
            for result in executor.map(worker, parameters):
                ...