I am doing optimization using optimize.minimize
from scipy
, say the objective function is fun
.
I need to do optimization for each row of my dataframe, and currently I am using Parallel
from joblib
:
import pandas as pd
import time
from joblib import Parallel,delayed
import multiprocessing
from scipy.optimize import basinhopping
import numpy as np
import jax
df=pd.DataFrame()
rng = np.random.default_rng()
df[["p11", "p12", "p13"]]=rng.dirichlet(np.ones(3),size=1120)
df[["s1", "s2", "s3"]]=rng.dirichlet(np.ones(3),size=1120)
num_cores = multiprocessing.cpu_count()-1
def get_attraction(b1,b2, b3):
c1 = 20 * b1 + 12 * b2 + 6 * b3
c2 = 12 * b1 + 24 * b2 + 18 * b3
c3 = 0 * b1 + 14 * b2 + 30 * b3
return c1,c2,c3
def get_a_tau_max(p1_tau, p2_tau, p3_tau, a):
b1_tau_l1_a1_a1 = p1_tau * (1 - a) + a
b2_tau_l1_a1_a1 = p2_tau * (1 - a)
b3_tau_l1_a1_a1 = p3_tau * (1 - a)
a1_tau_l1_a1, a2_tau_l1_a1, a3_tau_l1_a1 = get_attraction(b1_tau_l1_a1_a1, b2_tau_l1_a1_a1,b3_tau_l1_a1_a1)
b1_tau_l1_a1_a2 = p1_tau * (1 - a)
b2_tau_l1_a1_a2 = p2_tau * (1 - a) + a
b3_tau_l1_a1_a2 = p3_tau * (1 - a)
a1_tau_l1_a2, a2_tau_l1_a2, a3_tau_l1_a2 = get_attraction(b1_tau_l1_a1_a2, b2_tau_l1_a1_a2,b3_tau_l1_a1_a2)
b1_tau_l1_a1_a3 = p1_tau * (1 - a)
b2_tau_l1_a1_a3 = p2_tau * (1 - a)
b3_tau_l1_a1_a3 = p3_tau * (1 - a) + a
a1_tau_l1_a3, a2_tau_l1_a3, a3_tau_l1_a3 = get_attraction(b1_tau_l1_a1_a3, b2_tau_l1_a1_a3,b3_tau_l1_a1_a3)
a_tau_max = max(a1_tau_l1_a1, a2_tau_l1_a2, a3_tau_l1_a3)
return a_tau_max
def get_signal_conditional_at(at, b1, b2, b3, a):
if at==1:
b1_tau = b1 * (1-a) + a
b2_tau = b2 * (1-a)
b3_tau = b3 * (1-a)
elif at==2:
b1_tau = b1 * (1-a)
b2_tau = b2 * (1-a) + a
b3_tau = b3 * (1-a)
else:
b1_tau = b1 * (1-a)
b2_tau = b2 * (1-a)
b3_tau = b3 * (1-a) + a
return b1_tau, b2_tau, b3_tau
def get_belief(index,row):
if index <= 1118:
df_mask = pd.DataFrame()
df_mask["weight"] = [0.8 ** i for i in range(0, index+1)][::-1]
dem = df_mask["weight"].sum()
subdf = df.iloc[:index].copy()
s1_history = subdf["s1"].values.tolist()
s2_history = subdf["s2"].values.tolist()
s3_history = subdf["s3"].values.tolist()
belief=[]
for at in [[row["b11"], row["b12"], row["b13"]],
[row["b21"], row["b22"], row["b23"]],
[row["b31"], row["b32"], row["b32"]]]:
df_mask["s1"] = s1_history + [at[0]]
df_mask["s2"] = s2_history + [at[1]]
df_mask["s3"] = s3_history + [at[2]]
nom1 = (df_mask["weight"] * df_mask["s1"]).sum()
nom2 = (df_mask["weight"] * df_mask["s2"]).sum()
nom3 = (df_mask["weight"] * df_mask["s3"]).sum()
b1 = nom1 / dem
b2 = nom2 / dem
b3 = nom3 / dem
belief += [b1,b2,b3]
return belief
else:
return [0,0,0,0,0,0,0,0,0]
def get_prob(x, index,row, lamda, a): #
p1, p2, p3 = x[0], x[1], x[2]
p11 = row["p11"]
p12 = row["p12"]
p13 = row["p13"]
b1 = p11 * (1-a) + p1 * a
b2 = p12 * (1-a) + p2 * a
b3 = p13 * (1-a) + p3 * a
c1,c2,c3=get_attraction(b1,b2,b3)
row["b11"], row["b12"], row["b13"] = get_signal_conditional_at(1, p11,p12,p13,a)
row["b21"], row["b22"], row["b23"] = get_signal_conditional_at(2, p11,p12,p13,a)
row["b31"], row["b32"], row["b33"] = get_signal_conditional_at(3, p11,p12,p13,a)
belief = get_belief(index,row)
t1 = get_a_tau_max(belief[0], belief[1], belief[2], a)
t2 = get_a_tau_max(belief[3], belief[4], belief[5], a)
t3 = get_a_tau_max(belief[6], belief[7], belief[8], a)
a1 = c1+t1
a2 = c2+t2
a3 = c3+t3
nom1 = np.exp(lamda*a1)
nom2 = np.exp(lamda*a2)
nom3 = np.exp(lamda*a3)
dem = nom1 + nom2 + nom3
p1_t = nom1 / dem
p2_t = nom2 / dem
p3_t = nom3 / dem
return (p1_t - p1) ** 2 + (p2_t - p2) ** 2 + (p3_t - p3) ** 2
def get_root(index,row,lamda,a):
fun = get_prob
#jac_ = jax.jacfwd(fun)
result = basinhopping(fun, x0=[0.0, 0.25, 0.75], niter=50, interval=10, seed=np.random.seed(0),
minimizer_kwargs={'args': (index,row,lamda,a), #'jac':jac_,
'method': "SLSQP",
'tol': 1.0e-4,
'bounds': [(0.0, 1.0), (0.0, 1.0), (0.0, 1.0)],
'constraints': {"type": 'eq',
'fun': lambda x: 1 - x[0] - x[1] - x[2],
'jac': lambda x: np.full_like(x,-1)}})
x=np.append(result.x, index)
return x
start = time.time()
p_sv = Parallel(n_jobs=num_cores)(delayed(get_root)(index=index, row=row, lamda=0.5,a=7/13)
for index, row in df.iterrows())
print("elapsed:", time.time() - start)
elapsed: 240.9707179069519
Each optimization is independent, there is no information exchange between the rows.
It takes very long to finish the optimization for the whole dataset, and it only occupies about 30% of CPU (I have M1 pro).
My questions is how many n_jobs
I should set (or there is some other way such as change backend) to make it use 100% CPU such that I can finish this program faster?
I have access to a computer cluster which has 2 CPUs and 64 core per CPU. I have tried to set n_job=64
it does not provide significant improvement (I am still learning how to utilize 2 CPUs..).
Update:
Now I figured that providing Jacobian of the objective function slows down optimization. With 80 rows, with jac_
it takes 64 seconds, without it it take 20 seconds (and CPU goes almost 100%). But why? I thought providing Jacobian would make it faster.
Update:
I add an fun
and df
with 1120 rows. If I do not use jac_
, it is about 5 times faster using the cluster.
This is not a multi-processing issue but an algorithmic complexity one. The function get_prob
you are trying to optimize depends on get_belief
which is of worst-case linear complexity with respect to the index
number. The consequence is that your program is of quadratic complexity, which can be a huge slowdown.
In itself, the function get_belief
if very slow. The problem is that get_prob
(and thus get_belief
) will be called many times by the optimizer during the optimization process, thus slowing down the whole execution. If you remove it or replace it by a constant value (for instance replace index <= 118
by False
to short-circuit the expensive code path) you'll see that your program is significantly faster. On my computer this resulted in a 20x speed.
However, one can note that get_belief
does not depend on the variable x
of the optimized function fun
but only on the row parameters. This likely means that you could pre-compute get_belief
for each row of your data-frame instead of computing it inside the optimized function, thus improving the execution speed by a large margin.