I'm executing a paralelized function using Pool.starmap function. The execution of the function it self only takes 6.5 minutes according to tqdm library but the program stays in execution for 20 min more until it finishes. The function is processing and applying filters to some strings in some colums of a pandas dataframe. A different paralelized function could perform better? There is something wrong with starmap function?
Functon to be executed:
def get_best_string_filters(hst, apolnmar, apolnmod, apolnsub, apolnterm, amodnanu, ps, cc, cilindros, combustible, gearbox, year, search_model, search_version, search_container):
select = table_ecode[(table_ecode.HST == hst)]
year = int(year[-4:])
select = initial_selection(select, ps, cc, cilindros, combustible, gearbox, year)
temp = get_starting_selection(select.copy(), search_model, "HTB")
if temp.empty:
search_model, search_version, search_container = find_best_combination(select, search_model, search_version, search_container)
else:
select = temp.copy()
_, search_version, search_container = find_best_combination(select, "", search_version, search_container)
#print(search_model, search_version, search_container)
return [apolnmar, apolnmod, apolnsub, apolnterm, amodnanu, search_model, search_version, search_container]
starmap call:
if not exists("dict_search_ammo_make_version_fixed.npy"):
params = [(a, b, c, d, e, f, g, h, i, j, k, l, m, n, o) for a, b, c, d, e, f, g, h, i, j, k, l, m, n, o in values_to_change.values]
with Pool(mp.cpu_count()) as ex:
array_split_ammo_make_version = ex.starmap(get_best_string_filters, tqdm(params, total=len(params)))
dict_split_ammo_make_version = array_to_dict(array_split_ammo_make_version)
# save the dict to disk for faster future executions
np.save("dict_search_ammo_make_version_fixed.npy", dict_split_ammo_make_version)
else:
dict_split_ammo_make_version = np.load('dict_search_ammo_make_version_fixed.npy',allow_pickle='TRUE').item()
tqdm outputs 6.5 minutes and a completed status but the script continues to run for 20 long minutes:
In the demos below, generator function params
simulates generating arguments to worker function foo
slowly and foo
, which just returns the passed argument, which is either a list when using imap
or individual arguments that are the elements of a list.
Using imap
import time
def foo(the_list):
time.sleep(10)
return the_list
if __name__ == '__main__':
from tqdm import tqdm
from multiprocessing import Pool
def params():
for i in range(1, 9):
time.sleep(1)
yield list(range(i))
with Pool() as ex:
it = ex.imap(foo, params())
results = list(tqdm(it, total=8))
print(results)
Using apply_async
import time
def foo(*args):
time.sleep(10)
return args
if __name__ == '__main__':
from tqdm import tqdm
from multiprocessing import Pool
def params():
for i in range(1, 9):
time.sleep(1)
yield list(range(i))
def my_callback(result):
bar.update(1)
with Pool() as ex, tqdm(total=8) as bar:
results = []
async_results = [ex.apply_async(foo, param, callback=my_callback) for param in params()]
results = [async_result.get() for async_result in async_results]
print(results)
imap
with fixed sized tuples
import time
def foo(tpl):
time.sleep(10)
# unpack:
a, b, c, d, e, f, g, h = tpl
return (a + b) * (c + d) * (e + f) * (g + h)
if __name__ == '__main__':
from tqdm import tqdm
from multiprocessing import Pool
def params():
for i in range(1, 9):
time.sleep(1)
yield list(range(8))
with Pool() as ex:
it = ex.imap(foo, params())
results = list(tqdm(it, total=8))
print(results)