pythonmultiprocessingpython-multiprocessing

Python multiprocessing on a generator that reads files in


I am trying to read and process 1000s of files, but unfortunately it takes about 3x as long to process the file as it does to read it in from disk, so I would like to process these files as they are read in (and while I am continuing to read in additional files).

In a perfect world, I have a generator which reads one file at a time, and I would like to pass this generator to a pool of workers which process items from the generator as they are (slowly) generated.

Here's an example:

def process_file(file_string):
     ...
     return processed_file

 pool = Pool(processes=4)
 path = 'some/path/'
 results = pool.map(process_file, (open(path+part,'rb').read() for part in os.listdir(path)))

the only issue with the code above is that all the files are read into memory before the pool begins, which means that I need to wait for the disk to read everything in, and I also consume a large amount of memory.


Solution

  • Pool.map and Pool.map_async listify the iterable passed to them, so your generator will always be realized fully before processing even begins.

    The various Pool.imap* functions appear to process inputs as generators, so you might be able to change:

    results = pool.map(process_file, (open(path+part,'rb').read() for part in os.listdir(path)))
    

    to:

    # If you can process outputs one at a time, drop the list wrapper
    # If you can process outputs without order mattering, imap_unordered will
    # get you the best results
    results = list(pool.imap(process_file, (open(path+part,'rb').read() for part in os.listdir(path))))
    

    and get the same results without slurping before processing, but AFAICT, they'll still try to fully populate the queues as fast as they can, which could lead to a lot of data outstanding and excessive memory usage; beyond that, you'll be reading all the data in one process, then sending all of it over IPC, which means you're still mostly bottlenecked on I/O.

    In your position, I'd move the read into the task itself (and if you can, avoid reading in the whole file, processing it by line or by block instead of reading the whole thing at once). You'd get parallel reads, less IPC, and you won't risk slurping all the files before the first few are even processed; you'll never have more files open than you have workers. So the end result would look like:

    def process_file(path):
         with open(path, 'rb') as f:
             file_string = f.read()
         ... same as before ...
         return processed_file
    
    pool = Pool(processes=4)
    path = 'some/path/'
    results = pool.imap(process_file, (os.path.join(path, part) for part in os.listdir(path)))