pythonpandasmultiprocessingpoolstarmap

End of the execution too long using Pool.starmap


I'm executing a paralelized function using Pool.starmap function. The execution of the function it self only takes 6.5 minutes according to tqdm library but the program stays in execution for 20 min more until it finishes. The function is processing and applying filters to some strings in some colums of a pandas dataframe. A different paralelized function could perform better? There is something wrong with starmap function?

Functon to be executed:

def get_best_string_filters(hst, apolnmar, apolnmod, apolnsub, apolnterm, amodnanu, ps, cc, cilindros, combustible, gearbox, year, search_model, search_version, search_container):
    select = table_ecode[(table_ecode.HST == hst)]
    
    year = int(year[-4:])
    
    select = initial_selection(select, ps, cc, cilindros, combustible, gearbox, year)
    
    temp = get_starting_selection(select.copy(), search_model, "HTB")
    if temp.empty:
        search_model, search_version, search_container = find_best_combination(select, search_model, search_version, search_container)
    else:
        select = temp.copy()
        _, search_version, search_container = find_best_combination(select, "", search_version, search_container)
    
    #print(search_model, search_version, search_container)
    
    return [apolnmar, apolnmod, apolnsub, apolnterm, amodnanu, search_model, search_version, search_container]

starmap call:

if not exists("dict_search_ammo_make_version_fixed.npy"):
    params = [(a, b, c, d, e, f, g, h, i, j, k, l, m, n, o) for a, b, c, d, e, f, g, h, i, j, k, l, m, n, o in values_to_change.values]
    with Pool(mp.cpu_count()) as ex:
        array_split_ammo_make_version = ex.starmap(get_best_string_filters, tqdm(params, total=len(params)))
        dict_split_ammo_make_version = array_to_dict(array_split_ammo_make_version)
        # save the dict to disk for faster future executions
        np.save("dict_search_ammo_make_version_fixed.npy", dict_split_ammo_make_version)
else:
    dict_split_ammo_make_version = np.load('dict_search_ammo_make_version_fixed.npy',allow_pickle='TRUE').item()

tqdm outputs 6.5 minutes and a completed status but the script continues to run for 20 long minutes: Execution image


Solution

  • In the demos below, generator function params simulates generating arguments to worker function foo slowly and foo, which just returns the passed argument, which is either a list when using imap or individual arguments that are the elements of a list.

    Using imap

    import time
    
    def foo(the_list):
        time.sleep(10)
        return the_list
    
    if __name__ == '__main__':
        from tqdm import tqdm
        from multiprocessing import Pool
    
        def params():
            for i in range(1, 9):
                time.sleep(1)
                yield list(range(i))
    
        with Pool() as ex:
            it = ex.imap(foo, params())
            results = list(tqdm(it, total=8))
        print(results)
    

    Using apply_async

    import time
    
    def foo(*args):
        time.sleep(10)
        return args
    
    if __name__ == '__main__':
        from tqdm import tqdm
        from multiprocessing import Pool
    
        def params():
            for i in range(1, 9):
                time.sleep(1)
                yield list(range(i))
    
        def my_callback(result):
            bar.update(1)
    
        with Pool() as ex, tqdm(total=8) as bar:
            results = []
            async_results = [ex.apply_async(foo, param, callback=my_callback) for param in params()]
            results = [async_result.get() for async_result in async_results]
        print(results)
    

    imap with fixed sized tuples

    import time
    
    def foo(tpl):
        time.sleep(10)
        # unpack:
        a, b, c, d, e, f, g, h = tpl
        return (a + b) * (c +  d) * (e + f) * (g + h)
    
    
    if __name__ == '__main__':
        from tqdm import tqdm
        from multiprocessing import Pool
    
        def params():
            for i in range(1, 9):
                time.sleep(1)
                yield list(range(8))
    
        with Pool() as ex:
            it = ex.imap(foo, params())
            results = list(tqdm(it, total=8))
        print(results)