Here is my code in python. I'm trying to create 2 files, each with 5000 lines each.
import multiprocessing
import itertools
import os
# Define a generator function to yield lines of text
def generate_lines():
# Replace this with your logic to generate lines of text
for i in range(10000):
yield f"Line {i + 1}"
# Function to write lines to a file
def write_lines(filename, lines):
with open(filename, 'w') as file:
try:
for line in lines:
file.write(line + '\n')
except IOError as e:
errno, strerror = e.args
print(f"I/O error({errno}): {strerror}")
if __name__ == '__main__':
# Get the absolute path of the script's directory
script_dir = os.path.dirname(os.path.abspath(__file__))
print('script_dir: ', script_dir)
# Create a pool of processes`
with multiprocessing.Pool(2) as pool:
# Use itertools.islice to split the generator into chunks of 5000 lines each
total_lines = 10000
chunk_size = 5000
lines_generator = generate_lines(total_lines)
for i in range(0, total_lines // chunk_size):
chunk = itertools.islice(lines_generator, i * chunk_size, (i + 1) * chunk_size)
file_path = os.path.join(script_dir, f'file-{i}.txt')
pool.apply_async(write_lines, (file_path, chunk))
# Wait for all processes to complete
pool.close()
pool.join()
print("Writing completed successfully.")
Why doesn't these files appear in the working directory? conceptually, what is going on? Are the files getting swallowed by the processes and not appear in the main process? How large are these processes in terms of memory?
First there is an error (which I think may be a typo) when calling generate_lines
, you call it with an argument:
lines_generator = generate_lines(total_lines)
but the function does not define any:
def lines_generator():
Correcting this, there is one error that remains in the shadows: generators cannot be pickled.
If you call the get method of the AsyncResult
objects you should see an exception:
res = pool.apply_async(write_lines, (file_path, chunk))
res.get()
I think the closest thing to using a generator that can be pickled is an iterator, for example:
import multiprocessing
import itertools
import os
class LinesIter:
def __init__(self, lines):
self._i = 0
self._lines = lines
def __iter__(self):
return self
def __next__(self):
if self._i == self._lines:
raise StopIteration
self._i += 1
return f"Line {self._i}"
# Function to write lines to a file
def write_lines(filename, lines):
with open(filename, 'w') as file:
try:
for line in lines:
file.write(line + '\n')
except IOError as e:
errno, strerror = e.args
print(f"I/O error({errno}): {strerror}")
if __name__ == '__main__':
# Get the absolute path of the script's directory
script_dir = os.path.dirname(os.path.abspath(__file__))
print('script_dir: ', script_dir)
# Create a pool of processes`
with multiprocessing.Pool(2) as pool:
total_lines = 10000
chunk_size = 5000
lines_iter = LinesIter(total_lines)
for i in range(0, total_lines // chunk_size):
chunk = itertools.islice(lines_iter, i * chunk_size, (i + 1) * chunk_size)
file_path = os.path.join(script_dir, f'file-{i}.txt')
pool.apply_async(write_lines, (file_path, chunk))
pool.close()
pool.join()
print("Writing completed successfully.")