pythonpython-3.xmultiprocessinglatencythroughput

Multiprocessing pool map adds extra throughput time, how to reduce it?


As I am working on low latency requirement, I tried to use multiprocessing pool, each process is running in parallel and in my example it takes 1 second. But the overall process is adding extra 300ms i.e., 1 second 300ms.

Why the extra time is added? Is there any better way to reduce this throughput?

I referred this solution which is having max 2ms(avg of 0.1ms) of throughput, but not working for my code(i.e., fetch data from database using pandas read_sql)

I am assuming there will be a better way to reduce the extra overhead time.

Code:

import multiprocessing as mp
from multiprocessing import Pool

def test_func(x):
    ts = time.time()
    time.sleep(1) # In my case `pd.read_sql(query, conn_str)` which takes around 300ms
    print(time.time() - ts)
    return x

ts = time.time()

num_process = mp.cpu_count()*2

with Pool(num_process) as pool:
    results = pool.map(test_func, [1, 2, 3])
    print("*********")
    print(time.time() - ts)


print("*********")
print(time.time() - ts)

Output:

1.0009253025054932 # each process taken time
1.0011048316955566
1.0010230541229248


*********
1.2178916931152344 # time taken inside pool context manager
*********
1.2484805583953857 # total time 

Number of process available:

mp.cpu_count()*2 --> 32

Is it possible to bring the total time closer to 1 second? If yes, how?


Solution

  • As a general rule, multiprocessing is better suited to CPU-intensive work whereas multithreading is better for I/O bound work.

    That is a huge generalisation. What you actually need will depend on the various complexities of your code.

    The concurrent.futures module has two very useful classes namely: ThreadPoolExecutor and ProcessPoolExecutor. Their attributes are identical meaning that you can easily swap from one to the other during development by simply changing the class name but without changing any other code.

    Here's some code that gives you a framework for determining which is best suited to your needs. Just implement the do_work() function appropriately.

    The code will execute the process() function NVALS times and provide you with an average duration for both multithreading and multiprocessing.

    from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
    from time import perf_counter
    from collections.abc import Callable, Iterator
    import os
    
    NVALS = 10
    
    if (CPU_COUNT := os.cpu_count()) is None or CPU_COUNT < 2:
        CPU_COUNT = 2
    
    POOLSIZE = min((CPU_COUNT//2, NVALS))
    DP = 8
    
    def do_work(i: int, s:str) -> tuple[int, str]:
        print(i, s)
        return i, s
    
    def process(t: tuple[int, str]) -> float:
        _start = perf_counter()
        do_work(*t)
        return perf_counter() - _start
    
    def values(nvals: int) -> Iterator[tuple[int, str]]:
        for i in range(nvals):
            yield i, 'a'
        
    def main(executor: Callable):
        with executor(POOLSIZE) as _executor:
            print(executor.__name__)
            # create a list of tuples
            _sum = sum(_executor.map(process, values(NVALS)))
            print(f'Average={_sum/NVALS:.{DP}f}\n')
    
    
    
    if __name__ == '__main__':
        for executor in ProcessPoolExecutor, ThreadPoolExecutor:
            main(executor)