python-3.xfilewriter

Why don't my files appear when I write to them in this multiprocessor code in Python


Here is my code in python. I'm trying to create 2 files, each with 5000 lines each.

import multiprocessing
import itertools
import os


# Define a generator function to yield lines of text
def generate_lines():
    # Replace this with your logic to generate lines of text
    for i in range(10000):
        yield f"Line {i + 1}"


# Function to write lines to a file
def write_lines(filename, lines):
    with open(filename, 'w') as file:
        try:
            for line in lines:
                file.write(line + '\n')
        except IOError as e:
            errno, strerror = e.args
            print(f"I/O error({errno}): {strerror}")


if __name__ == '__main__':
    # Get the absolute path of the script's directory
    script_dir = os.path.dirname(os.path.abspath(__file__))
    print('script_dir: ', script_dir)
    # Create a pool of processes`
    with multiprocessing.Pool(2) as pool:
        # Use itertools.islice to split the generator into chunks of 5000 lines each
        total_lines = 10000
        chunk_size = 5000
        lines_generator = generate_lines(total_lines)
        for i in range(0, total_lines // chunk_size):
            chunk = itertools.islice(lines_generator, i * chunk_size, (i + 1) * chunk_size)
            file_path = os.path.join(script_dir, f'file-{i}.txt')
            pool.apply_async(write_lines, (file_path, chunk))

        # Wait for all processes to complete
        pool.close()
        pool.join()

    print("Writing completed successfully.")

Why doesn't these files appear in the working directory? conceptually, what is going on? Are the files getting swallowed by the processes and not appear in the main process? How large are these processes in terms of memory?


Solution

  • First there is an error (which I think may be a typo) when calling generate_lines, you call it with an argument:

    lines_generator = generate_lines(total_lines)
    

    but the function does not define any:

    def lines_generator():
    

    Correcting this, there is one error that remains in the shadows: generators cannot be pickled.

    If you call the get method of the AsyncResult objects you should see an exception:

    res = pool.apply_async(write_lines, (file_path, chunk))
    res.get()
    

    I think the closest thing to using a generator that can be pickled is an iterator, for example:

    import multiprocessing
    import itertools
    import os
    
    class LinesIter:
        def __init__(self, lines):
            self._i = 0
            self._lines = lines
        
        def __iter__(self):
            return self
        
        def __next__(self):
            if self._i == self._lines:
                raise StopIteration
            self._i += 1
            return f"Line {self._i}"
    
    
    # Function to write lines to a file
    def write_lines(filename, lines):
        with open(filename, 'w') as file:
            try:
                for line in lines:
                    file.write(line + '\n')
            except IOError as e:
                errno, strerror = e.args
                print(f"I/O error({errno}): {strerror}")
    
    
    if __name__ == '__main__':
        # Get the absolute path of the script's directory
        script_dir = os.path.dirname(os.path.abspath(__file__))
        print('script_dir: ', script_dir)
        # Create a pool of processes`
        with multiprocessing.Pool(2) as pool:
            total_lines = 10000
            chunk_size = 5000
            lines_iter = LinesIter(total_lines)
            for i in range(0, total_lines // chunk_size):
                chunk = itertools.islice(lines_iter, i * chunk_size, (i + 1) * chunk_size)
                file_path = os.path.join(script_dir, f'file-{i}.txt')
                pool.apply_async(write_lines, (file_path, chunk))
            pool.close()
            pool.join()
            
        print("Writing completed successfully.")