I have been trying of late to parallelize some of my code (for speed) by resorting to the multiprocessing
library inside of a class itself making use of an unbound method (basically, it is a user-supplied function held inside of a class attribute). It doesn't work at all.
Context: I'm trying to parallelize a "parallel genetic algorithms" class which, as the name strongly implies, is in itself an embarassingly parallel problem.
As far as I can say, there are 2 problems to be found within my code. (1) The user-supplied fitness function is not exported over the processes generated by the Pool
object, and no amount of deep copying seems able to resolve it. (2) The other issue is that, perhaps, the Pool
object is unsure about how to handle multi-dimensional outputs... I'm really unsure about this one tbh.
I've tried to work out a tiny, standalone version of my code just so to make things clearer (it doesn't currently run, because of the bugs, which is kind of the point):
from itertools import repeat
import multiprocessing as mp
import numpy as np
class GeneticAlgorithm:
def __init__(self, I, G, D, U, fitness_function, run_parallel):
self.fitness_function = fitness_function # User-supplied fitness function
self.D = D # Problem size (number of genes)
self.I = I # Population size (number of individuals)
self.G = G # Number of generations
self.U = U # Number of parallel populations
self.run_parallel = run_parallel
def sga(self):
'''One single-threaded genetic algorithm'''
# Activation rate is fixed at 0.5 for the sake of this MWE
pop = np.random.binomial(size=self.I * self.D, n=1, p=0.5).reshape(self.I, self.D) # Seed population
for g in range(self.G):
# fitness is computed for all individuals
fitpop = np.array([self.fitness_function(ind=ind) for ind in pop])
# fitness is scaled back to 100%
fitpop /= np.sum(fitpop)
# 2I parents are selected at random according to each individual's relative fitness score
parents = np.random.choice(range(self.I), size=2 * self.I, replace=True, p=fitpop).reshape(self.I, 2)
# Parents are crossed 2 by 2, with each pair producing exactly one offspring
crossover = np.array([np.random.choice(parents[i, :], size=self.D, replace=True) for i in range(self.I)]).reshape(self.I, self.D)
embryos = np.array([[pop[crossover[i, d], d] for d in range(self.D)] for i in range(self.I)]).reshape(self.I, self.D)
# Mutation rate is fixed at 1/D for the sake of this MWE
mutations = np.random.binomial(size=self.I * self.D, n=1, p=1 / self.D).reshape(self.I, self.D)
# "Mutated embryos" become fully fledged individuals and replace the parent generation
pop = (1 - mutations) * embryos + mutations * (1 - embryos)
# Individuals are aggregated gene-wise, with the average of active and inactive genes creating a ratio
return pop.mean(axis=0)
def pga(self):
'''Multiple parallel genetic algorithms'''
if self.run_parallel:
p = mp.Pool(mp.cpu_count())
universes = p.starmap(GeneticAlgorithm.sga, zip(repeat(self, self.U)))
p.close()
p.join()
else:
universes = np.zeros(self.U * self.D).reshape(self.U, self.D)
for u in range(self.U):
universes[u, :] = self.sga()
# Multiple GAs are aggregated in a sort of "mean of means"
return universes.mean(axis=0)
if __name__ == '__main__':
def my_fitness_function(ind):
'''Dummy fitness function, scores all individual equally...'''
return 1.0
# Dummy test to check if the code runs... it doesn't :(
ga = GeneticAlgorithm(I=10, G=3, D=5, U=10, fitness_function=my_fitness_function, run_parallel=True)
print(ga.pga())
Any kind of hint, piece of code or complete solution will be most welcomed. This used to be fairly easy in R, but with Python I'm apparently at my wits' end... thanks!
ETA: Fixed a few typos in the code and added the run_parallel
argument to show that it runs perfectly fine without parallelization. Oh yeah, also, I run on Windows, otherwise I might have tried that Ray
library which, I'm told, works wonders, especially when compared to multiprocessing
.
See my comment to your post. Also, each process in the pool should seed the random number generator uniquely. Otherwise, they would be generating the same sequence of random numbers.
Note that multiprocessing will not necessarily run faster due to the overhead in creating the process pool and in passing arguments and results from one process's address space to another address space. Your worker function, sga
, has to be sufficiently CPU-intensive for multiprocessing to be advantageous, which I now understand that in actuality it is.
from itertools import repeat
import multiprocessing as mp
import numpy as np
# this will be executed by each process in the pool:
def init_pool():
from threading import current_thread
ident = current_thread().ident
np.random.seed(ident)
def my_fitness_function(ind):
'''Dummy fitness function, scores all individual equally...'''
return 1.0
class GeneticAlgorithm:
def __init__(self, I, G, D, U, fitness_function, run_parallel):
self.fitness_function = fitness_function # User-supplied fitness function
self.D = D # Problem size (number of genes)
self.I = I # Population size (number of individuals)
self.G = G # Number of generations
self.U = U # Number of parallel populations
self.run_parallel = run_parallel
def sga(self):
'''One single-threaded genetic algorithm'''
# Activation rate is fixed at 0.5 for the sake of this MWE
pop = np.random.binomial(size=self.I * self.D, n=1, p=0.5).reshape(self.I, self.D) # Seed population
for g in range(self.G):
# fitness is computed for all individuals
fitpop = np.array([self.fitness_function(ind=ind) for ind in pop])
# fitness is scaled back to 100%
fitpop /= np.sum(fitpop)
# 2I parents are selected at random according to each individual's relative fitness score
parents = np.random.choice(range(self.I), size=2 * self.I, replace=True, p=fitpop).reshape(self.I, 2)
# Parents are crossed 2 by 2, with each pair producing exactly one offspring
crossover = np.array([np.random.choice(parents[i, :], size=self.D, replace=True) for i in range(self.I)]).reshape(self.I, self.D)
embryos = np.array([[pop[crossover[i, d], d] for d in range(self.D)] for i in range(self.I)]).reshape(self.I, self.D)
# Mutation rate is fixed at 1/D for the sake of this MWE
mutations = np.random.binomial(size=self.I * self.D, n=1, p=1 / self.D).reshape(self.I, self.D)
# "Mutated embryos" become fully fledged individuals and replace the parent generation
pop = (1 - mutations) * embryos + mutations * (1 - embryos)
# Individuals are aggregated gene-wise, with the average of active and inactive genes creating a ratio
return pop.mean(axis=0)
def pga(self):
'''Multiple parallel genetic algorithms'''
universes = np.zeros(self.U * self.D).reshape(self.U, self.D)
if self.run_parallel:
pool_size = min(mp.cpu_count(), self.U)
p = mp.Pool(pool_size, initializer=init_pool)
results = p.starmap(GeneticAlgorithm.sga, zip(repeat(self, self.U)))
p.close()
p.join()
for u in range(self.U):
universes[u, :] = results[u]
else:
for u in range(self.U):
universes[u, :] = self.sga()
# Multiple GAs are aggregated in a sort of "mean of means"
return universes.mean(axis=0)
if __name__ == '__main__':
# Dummy test to check if the code runs... it doesn't :(
ga = GeneticAlgorithm(I=10, G=3, D=5, U=10, fitness_function=my_fitness_function, run_parallel=True)
print(ga.pga())
Prints:
[0.56 0.46 0.38 0.54 0.52]