I am optimizing a gaussian mix using maximum likelyhood estimation. Originally I used the following model:
def normal(x, mu, sigma):
"""
Gaussian (normal) probability density function.
Args:
x (np.ndarray): Data points.
mu (float): Mean of the distribution.
sigma (float): Standard deviation of the distribution.
Returns:
np.ndarray: Probability density values.
"""
return (1 / (np.sqrt(2 * np.pi) * sigma)) * np.exp(-0.5 * ((x - mu) / sigma) ** 2)
def model(x, a, mu1, s1, mu2, s2):
return a*normal(x, mu1, s1) + (1-a)*normal(x, mu2, s2)
This works great and finds a good fit in under a second. I now wanted to dynamically generate such a function for any number of peaks.
def generate_gaussian_mix(n):
def gaussian_mix(x, *params):
if len(params) != 3 * n - 1:
print(params)
raise ValueError(f"Expected {3 * n - 1} parameters, but got {len(params)}.")
params = np.asarray(params)
mu = params[0::3] # Means
sigma = params[1::3] # Standard deviations
a = params[2::3] # Weights
a = np.hstack((a, 1 - np.sum(a)))
return np.sum((a / (np.sqrt(2 * np.pi) * sigma))*np.exp(-0.5 * ((x - mu) / sigma) ** 2))
return np.vectorize(gaussian_mix)
This model takes over three minutes to compute on my laptop with the same number of peaks and events. What are optimization steps I could take to reduce the magnitude of this second function? Is there a good way to avoid vectorization? Do you have any ideas to avoid the repeated slicing?
for completeness, this is the optimization function:
def neg_log_event_likelyhood(model, event, theta):
x = -np.log(model(event, *theta))
return x
def fit_distribution_anneal(model, events, bounds, data_range = None, **kwargs):
def total_log_likelyhood(theta, model, events):
return np.sum(neg_log_event_likelyhood(model, events, theta))
if data_range is not None:
events = np.copy(events)
events = events[np.logical_and(events > data_range[0], events < data_range[1])]
result = dual_annealing(total_log_likelyhood, bounds, args=(model, events), **kwargs)
params = result.x
return params
The annealing is required as opposed to minimize due to the non convex nature of the problem.
As suspected the primary problem was np.vectorize
. By using np.transpose
I can abuse the matrix multiplication to calculate the normal distribution element wise and sum appropriate axis of the array. The following is the optimized code:
def generate_gaussian_mix(n):
"""
Dynamically generates a function for the superposition of `n` Gaussian functions.
Args:
n (int): Number of Gaussian functions to include in the superposition.
Returns:
function: A callable function `f(x, params)` where `params` is a flat array of weights, means,
and standard deviations for each Gaussian component, of size 3*n.
"""
def gaussian_mix(x, *params):
if len(params) != 3 * n - 1:
print(params)
raise ValueError(f"Expected {3 * n - 1} parameters, but got {len(params)}.")
params = np.asarray(params)
mu = params[0::3] # Means
sigma = params[1::3] # Standard deviations
a = params[2::3] # Weights
return np.sum(normal(np.transpose([x]), mu[:-1], sigma[:-1], a), axis = 1) + normal(np.transpose([x]), mu[-1], sigma[-1], 1-np.sum(a))[:,0]
return gaussian_mix