pythonmultiprocessingconcurrent.futurestqdm

How to use tqdm and ProcessPoolExecutor


I'm creating a program that processes files sometimes very huge files in the order of GB. It takes a lot of time. At first I tried using ThreadPoolExecutor which relatively improved the speed. For example a ~200 Mb file would take about ~3 minutes running synchronously, with ThreadPoolExecutor, about ~130+ seconds. That's too slow for me. I tried ProcessPoolExecutor and it did wonderful. Same work done in about 12-18 seconds. Well makes sense since the task is eating a lot of cpu. Now the problem comes to visualizing the progress of the task. I used tqdm. With threads everything works wonderfully. I can see the beautiful progress. But when I change to use Processpool, the program crashes.

My code looks like as follows:

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
from pathlib import Path
from tqdm import tqdm
import os


class FileProcessor:
    NJOBS = 8 # 4 for multiprocessing
    CHK_SIZE = 1024 * 1024 * 80 # 80 MB


    def __init__(self, fp: str | Path):
        self.filepath = Path(fp)
        self.filesize = os.path.getsize(self.filepath)

    @classmethod
    def _process_chunk(cls, chunk: bytes, pb: tqdm, *somemoreargs):
        # processes each byte, updates progressbar afterwards
        array = bytearray(chunk)
        for i in range(len(array)):
            # do sam' with byte at i
            time.sleep(0.0001) # for large file comment this
            if pb:
                pb.update()
        return bytes(array) # and some more vals

    def perform(self):
        def subchunk(chunk: bytes):
            schk_size = len(chunk) // FileProcessor.NJOBS
            if not schk_size:
                schk_size = len(chunk) # will work on this later
            i = 0
            while (schunk := chunk[i:i + schk_size]):
                yield schunk # and some more info
                i += schk_size

        progressbar = tqdm(range(self.filesize))
        file = self.filepath.open(mode="rb")
        executor = ThreadPoolExecutor(max_workers=FileProcessor.NJOBS)
        # executor = ProcessPoolExecutor(max_workers=os.get_cpu_count())
        with progressbar, file, executor:
            while (chunk := file.read(FileProcessor.CHK_SIZE)):
                futures = [executor.submit(FileProcessor._process_chunk, sc, progressbar) for sc in subchunk(chunk)]
                # futures = [executor.submit(FileProcessor._process_chunk, sc, None) for sc in subchunk(chunk)]
                for future in as_completed(futures):
                    # do something with results
                    res = future.result()
                    # progressbar.update(len(res)) # uncomment for multiprocessing
            # do final stuff

This works well with multi threads. The progressbar fills smoothly. But when I change to multi processes, the program crashes. I am guessing is due to the fact that "processes not sharing memory space".

So, the question is how can I use tqdm to show the progress smoothly whilst using multi processing. For now I am updating the bar after the process ends: in for future in as_completed(futures) but the progress display is rather ugly with big jumps


Solution

  • Since you want to use a ProcessPoolExecutor instance (you code still shows you using a ThreadPoolExecutor instance), then the main process which has nothing else to except wait for submitted tasks to complete can easily be the updater of the progress bar. You now need to arrange for your worker function/method (in your case _process_chunk to return an additional value that is the amount to advance the progress bar by:

    import os
    import time
    from concurrent.futures import (ProcessPoolExecutor, as_completed)
    from pathlib import Path
    
    from tqdm import tqdm
    
    
    class FileProcessor:
        NJOBS = 4  # 4 for multiprocessing
        CHK_SIZE = 1024 * 1024 * 80  # 80 MB
    
        def __init__(self, fp: str):
            self.filepath = Path(fp)
            self.filesize = os.path.getsize(self.filepath)
    
        @staticmethod
        def _process_chunk(chunk: bytes, *somemoreargs):
            array = bytearray(chunk)
            for b in array:
                #time.sleep(0.0001)
                ...
    
            # Also return the number of bytes processed:
            return bytes(array), len(array)
    
        def perform(self):
            def subchunk(chunk: bytes):
                schk_size = len(chunk) // FileProcessor.NJOBS
                if not schk_size:
                    schk_size = len(chunk)  # will work on this later
                i = 0
                while schunk := chunk[i : i + schk_size]:
                    yield schunk  # and some more info
                    i += schk_size
    
            progressbar = tqdm(total=self.filesize)
            file = self.filepath.open(mode="rb")
            executor = ProcessPoolExecutor(max_workers=FileProcessor.NJOBS)
            with progressbar, file, executor:
                while chunk := file.read(FileProcessor.CHK_SIZE):
                    futures = [
                        executor.submit(FileProcessor._process_chunk, sc)
                        for sc in subchunk(chunk)
                    ]
                    for future in as_completed(futures):
                        _, bytes_processed = future.result()
                        progressbar.update(bytes_processed)
                        future.result()
    
                # do final stuff
    
    
    if __name__ == "__main__":
        f = FileProcessor("some_big_file.tar")
        f.perform()