I'm creating a program that processes files sometimes very huge files in the order of GB. It takes a lot of time. At first I tried using ThreadPoolExecutor which relatively improved the speed. For example a ~200 Mb file would take about ~3 minutes running synchronously, with ThreadPoolExecutor, about ~130+ seconds. That's too slow for me. I tried ProcessPoolExecutor and it did wonderful. Same work done in about 12-18 seconds. Well makes sense since the task is eating a lot of cpu. Now the problem comes to visualizing the progress of the task. I used tqdm. With threads everything works wonderfully. I can see the beautiful progress. But when I change to use Processpool, the program crashes.
My code looks like as follows:
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
from pathlib import Path
from tqdm import tqdm
import os
class FileProcessor:
NJOBS = 8 # 4 for multiprocessing
CHK_SIZE = 1024 * 1024 * 80 # 80 MB
def __init__(self, fp: str | Path):
self.filepath = Path(fp)
self.filesize = os.path.getsize(self.filepath)
@classmethod
def _process_chunk(cls, chunk: bytes, pb: tqdm, *somemoreargs):
# processes each byte, updates progressbar afterwards
array = bytearray(chunk)
for i in range(len(array)):
# do sam' with byte at i
time.sleep(0.0001) # for large file comment this
if pb:
pb.update()
return bytes(array) # and some more vals
def perform(self):
def subchunk(chunk: bytes):
schk_size = len(chunk) // FileProcessor.NJOBS
if not schk_size:
schk_size = len(chunk) # will work on this later
i = 0
while (schunk := chunk[i:i + schk_size]):
yield schunk # and some more info
i += schk_size
progressbar = tqdm(range(self.filesize))
file = self.filepath.open(mode="rb")
executor = ThreadPoolExecutor(max_workers=FileProcessor.NJOBS)
# executor = ProcessPoolExecutor(max_workers=os.get_cpu_count())
with progressbar, file, executor:
while (chunk := file.read(FileProcessor.CHK_SIZE)):
futures = [executor.submit(FileProcessor._process_chunk, sc, progressbar) for sc in subchunk(chunk)]
# futures = [executor.submit(FileProcessor._process_chunk, sc, None) for sc in subchunk(chunk)]
for future in as_completed(futures):
# do something with results
res = future.result()
# progressbar.update(len(res)) # uncomment for multiprocessing
# do final stuff
This works well with multi threads. The progressbar fills smoothly. But when I change to multi processes, the program crashes. I am guessing is due to the fact that "processes not sharing memory space".
So, the question is how can I use tqdm to show the progress smoothly whilst using multi processing. For now I am updating the bar after the process ends: in for future in as_completed(futures)
but the progress display is rather ugly with big jumps
Since you want to use a ProcessPoolExecutor
instance (you code still shows you using a ThreadPoolExecutor
instance), then the main process which has nothing else to except wait for submitted tasks to complete can easily be the updater of the progress bar. You now need to arrange for your worker function/method (in your case _process_chunk
to return an additional value that is the amount to advance the progress bar by:
import os
import time
from concurrent.futures import (ProcessPoolExecutor, as_completed)
from pathlib import Path
from tqdm import tqdm
class FileProcessor:
NJOBS = 4 # 4 for multiprocessing
CHK_SIZE = 1024 * 1024 * 80 # 80 MB
def __init__(self, fp: str):
self.filepath = Path(fp)
self.filesize = os.path.getsize(self.filepath)
@staticmethod
def _process_chunk(chunk: bytes, *somemoreargs):
array = bytearray(chunk)
for b in array:
#time.sleep(0.0001)
...
# Also return the number of bytes processed:
return bytes(array), len(array)
def perform(self):
def subchunk(chunk: bytes):
schk_size = len(chunk) // FileProcessor.NJOBS
if not schk_size:
schk_size = len(chunk) # will work on this later
i = 0
while schunk := chunk[i : i + schk_size]:
yield schunk # and some more info
i += schk_size
progressbar = tqdm(total=self.filesize)
file = self.filepath.open(mode="rb")
executor = ProcessPoolExecutor(max_workers=FileProcessor.NJOBS)
with progressbar, file, executor:
while chunk := file.read(FileProcessor.CHK_SIZE):
futures = [
executor.submit(FileProcessor._process_chunk, sc)
for sc in subchunk(chunk)
]
for future in as_completed(futures):
_, bytes_processed = future.result()
progressbar.update(bytes_processed)
future.result()
# do final stuff
if __name__ == "__main__":
f = FileProcessor("some_big_file.tar")
f.perform()