pythonmultithreadingparallel-processingfile-handlingconcurrent.futures

How to efficiently read and process a large file in parallel while keeping the original line order?


I'm working on a Python project where I need to process a very large file (e.g., a multi-gigabyte CSV or log file). To speed things up, I want to process the file in parallel, but I need to ensure that the output is in the same order as the original file.

Problem:

I tried using concurrent.futures.ThreadPoolExecutor for parallel processing, but the results come back out of order due to the nature of parallel tasks. How can I maintain the original order while still taking advantage of parallel processing?

import concurrent.futures

def process_line(line):
    # Simulate some CPU-bound work on the line
    return line.upper()

with open("large_file.txt", "r") as file:
    lines = file.readlines()

with concurrent.futures.ThreadPoolExecutor() as executor:
    # Process the lines in parallel
    results = list(executor.map(process_line, lines))

# Write results to an output file
with open("output.txt", "w") as file:
    for result in results:
        file.write(result)

This works, but the output order is not always guaranteed to match the input file order, especially when using multiple threads. I've read that concurrent.futures may not guarantee order, and this behavior is a problem for me.


Solution

  • Executor.map is guaranteed to produce results in order, it just doesn't guarantee the work is done in any particular order. So as long as your workers aren't independently reading from input or writing to output (that is, all I/O is done in the main process), your output will occur in order, which is all that matters.

    You could slightly reduce the memory overhead by delaying opening the file until after the workers are spawned, and lazily reading the lines (rather than slurping the file into a list of lines). You'd also want to explicitly use an import guard so your code will work properly on non-fork-based systems (e.g. all Windows, macOS by default):

    import concurrent.futures
    
    def process_line(line):
        # Simulate some CPU-bound work on the line
        return line.upper()
    
    # Wrap main script behavior in main function
    def main():
        with concurrent.futures.ThreadPoolExecutor() as executor:
            with open("large_file.txt") as infile, open("output.txt", "w") as outfile:
                # Process the lines in parallel, but read and write them in-order
                # Files are iterables of their lines (so you avoid a list of lines from the file)
                # and Executor.map returns an iterator that can be written as results arrive,
                # avoiding an unnecessary list of results
                outfile.writelines(executor.map(process_line, infile))
    
    # Invoke main function only when run as script, not when imported or invoked
    # as part of spawn-based multiprocessing
    if __name__ == '__main__':
        main()
    

    Note that by using ThreadPoolExecutor, you're only going to get parallel processing improvements if the work done in process_line is either I/O bound, or it's CPU-bound work performed in a Python C extension that releases the GIL. In the toy example here, no parallelism can be realized.

    I'll also note that even with the tweaks to avoid the lists, you will in fact slurp the input file before producing any results (Executor.map creates all the tasks before it begins producing results). If you're in a position where you can't deal with that (the file is too large for your available RAM), the patch on that issue could be adapted to minimize the amount of eagerly pulled work.