I cannot collect output of all Pool processes. Resulted dictionary probably contains output of 1 of 3 parallel processes (288 of 411 elements). Here is my code.
from itertools import repeat
from multiprocessing import Pool, Manager
def reader(entry, outcomes): #other args are constant
.............................................
#Some condition:
prediction = min(distances) + (mlc_data[min(distances)[1]],)
outcomes[entry_pairs[' '.join(entry)]] = prediction
return outcomes
manager = Manager()
outcomes = manager.dict()
with Pool(3) as p:
output = p.starmap(reader, zip(input[0], repeat(outcomes)))
p.close()
p.join()
Finally, I have only 288 of 411 elements in outcomes dictionary. From the other side, i have 411**2 elements in output, that probably means, that my code is not well optimized.
You are returning the entire, ever-growing outcomes
dictionary with each call to reader. That is serialized, sent to the parent process, deserialized and then added to output
. That is a huge cost, especially considering that the manager is also moving the data between processes to keep the dict in sync. reader
could return prediction
and its key instead. Then let the parent process build the result dictionary. This also lets you test all of the returned keys to make sure there are no duplicates.
from itertools import repeat
from multiprocessing import Pool
def reader(entry): #other args are constant
#Some condition:
prediction = min_d:=min(distances) + (mlc_data[min_d[1]],)
return entry_pairs[' '.join(entry)], prediction
outcomes = {}
with Pool(3) as p:
for key, prediction in p.starmap(reader, zip(input[0], repeat(outcomes))):
if key in outcomes:
print("ERROR, key duplicated:", key)
else:
outcomes[key] = prediction