pythondictionaryparallel-processingmultiprocessingstarmap

Python - pool.starmap() running much slower than pool.map()


To clarify first of all, I'm NOT asking why map in multiprocessing is slow. I had code working just fine using pool.map(). But, in developing it (and to make it more generic), I needed to use pool.starmap() to pass 2 arguments instead of one. I'm still fairly new to Python and multiprocessing, so I'm not sure if I'm doing something obviously wrong here. I also couldn't find anything on this that's been previously asked, so apologies if this has already been answered. Using python 3.10 by the way.

I'm processing just under 5 million items in a list, and managed to get a result in just over 12 minutes (instead of a predicted 6 1/2 days if it was run iteratively!) when using pool.map(). I'm essentially obtaining the intersection of List_A and List_B, but I need to preserve the frequencies of each item, and so have to do it in O(n^m).

But now, I'm getting significantly longer times when using pool.starmap(). I can't seem to work out why, and if anyone can give me some indiciation it would be greatly appreciated!

Here is the code for pool.map() that works quickly as expected (where List_B is actually part of the listCompare function:

def listCompare(list_A):
    toReturn = []
    for item in list_A:
        if item in list_B:
            toReturn.append(item)
    return toReturn

out = []
chnks = chunks(list_a, multiprocessing.cpu_count())
with multiprocessing.Pool() as pool:
    for result in pool.map(listCompare, chnks):
        out.extend(result)            
print("Parallel:", out)

Here is the code for pool.starmap() that works slowly. listCompare is modified to take 2 arguments here: (I can't use my chunks method here, as I can't pass the yeild into the tuple, so I've set the chunksize differently. Is this the reason for the slow down?)

def listCompare(list_A, list_B):
    toReturn = []
    for item in list_A:
        if item in list_B:
            toReturn.append(item)
    return toReturn

with multiprocessing.Pool() as pool:
    for resultA in pool.starmap(listCompare, [(list_a1, list_b1)], chunksize=multiprocessing.cpu_count()):
        output_list1.extend(resultA)
    for resultB in pool.starmap(listCompare, [(list_a2, list_b2)], chunksize=multiprocessing.cpu_count()):
        output_list2.extend(resultB)
    for resultC in pool.starmap(listCompare, [(list_a3, list_b3)], chunksize=multiprocessing.cpu_count()):
        output_list3.extend(resultC)
    for resultD in pool.starmap(listCompare, [(list_a4, list_b4)], chunksize=multiprocessing.cpu_count()):
        output_list4.extend(resultD)

Thanks in advance, and apologies if I've missed out anything that may help in answering! As I said earlier, I know this can be done with intersection, but I need the frequencies of each occurance, so I need to preserve duplicates.


Solution

  • As pointed out in the comments, the speed is slow here because you are passing the whole iterable to a single process (as the argument length is one), rather than spread it over multiple processes.

    I would recommend to keep using map here, using functools.partial to fix argument containing list_B, when passing the target function:

    from functools import partial
    
    def listCompare(list_B, list_A):
        toReturn = []
        for item in list_A:
            if item in list_B:
                toReturn.append(item)
        return toReturn
    
    
    with multiprocessing.Pool() as pool:
        for resultA in pool.map(partial(listCompare, list_b1), list_a1, chunksize=len(list_a1//multiprocessing.cpu_count())):
            output_list1.extend(resultA)
    

    Keep in mind that the order of the keyword arguments has been swapped in listCompare for this to work (list_B comes before list_A). The chunking is then done using chunksize parameter (admittedly, a very crude version of it, you may want to fine tune it).

    Alternatively, if you want to use starmap, then you may keep using your chunk method like below:

    def listCompare(list_A, list_B):
        toReturn = []
        for item in list_A:
            if item in list_B:
                toReturn.append(item)
        return toReturn
    
    with multiprocessing.Pool() as pool:
        chunks = chunk(list_a, multiprocessing.cpu_count())
        for resultA in pool.starmap(listCompare, [(c, list_b1) for c in chunks]):
            output_list1.extend(resultA)
    

    Since you already have chunked the iterable on your own, you no longer need to pass a chunksize argument.

    As a sidenote, if the iterable you pass to the pool is too long, you may want to use imap instead to lazily iterate rather than storing the whole iterable in memory.