This is a MWE of some code I'm writing to do some monte carlo exercises. I need to estimate models across draws and I'm parallelizing across models. In the MWE a "model" is just parametrized by a number of draws and a seed.
I define the functions below.
import time
import pandas as pd
import numpy as np
import multiprocessing as mp
def linreg(df):
y = df[['y']].values
x = np.hstack([np.ones((df.shape[0], 1)), df[['treat']].values])
xx_inv = np.linalg.inv(x.T @ x)
beta_hat = xx_inv @ (x.T @ y)
return pd.Series(beta_hat.flat, index=['intercept', 'coef'])
def shuffle_treat(df):
df['treat'] = df['treat'].sample(frac=1, replace=False).values
return df
def run_analysis(draws, seed, sleep=0):
N = 5000
df = pd.DataFrame({'treat':np.random.choice([0,1], size=N, replace=True)})
df['u'] = np.random.normal(size=N)
df['y'] = df.eval('10 + 5*treat + u')
np.random.seed(seed)
time.sleep(sleep)
est = [linreg(shuffle_treat(df)) for k in range(draws)]
est = pd.concat(est, axis=0, sort=False, keys=range(draws), names=['k', 'param'])
return est
I then test them and show that running in serial takes a similar amount of time as running in parallel. I can confirm they are running in parallel because if I force some sleep time there is a clear gain from parallelization. I know the problem is coming from this list comprehension: [linreg(shuffle_treat(df)) for k in range(draws)]
, but I don't understand why I don't achieve gains from parallelization across models. I've tried to parallelize across draws instead, but the results were even worse.
param_list = [dict(draws=500, seed=1029), dict(draws=500, seed=1029)]
param_list_sleep = [dict(draws=500, seed=1029, sleep=5), dict(draws=500, seed=1029, sleep=5)]
def run_analysis_wrapper(params):
run_analysis(**params)
start = time.time()
for params in param_list:
run_analysis_wrapper(params)
end = time.time()
print(f'double run 1 process: {(end - start):.2f} sec')
start = time.time()
with mp.Pool(processes=2) as pool:
pool.map(run_analysis_wrapper, param_list)
end = time.time()
print(f'double run 2 processes: {(end - start):.2f} sec')
start = time.time()
for params in param_list_sleep:
run_analysis_wrapper(params)
end = time.time()
print(f'double run 1 process w/ sleep: {(end - start):.2f} sec')
start = time.time()
with mp.Pool(processes=2) as pool:
pool.map(run_analysis_wrapper, param_list_sleep)
end = time.time()
print(f'double run 2 processes w/ sleep: {(end - start):.2f} sec')
Output:
double run 1 process: 2.52 sec
double run 2 processes: 2.94 sec
double run 1 process w/ sleep: 12.30 sec
double run 2 processes w/ sleep: 7.71 sec
For reference machine is Linux-based EC2 instance with nproc --a
showing 48 CPUs. I'm running within a conda environment with Python 3.9.16.
Based on the comment by Nils Werner, I tried disabling multithreading in Numpy. And now I get the gains from parallelization. Interestingly, the serial version is also about twice as fast.
import time
import os
os.environ['OMP_NUM_THREADS'] = '1'
os.environ['MKL_NUM_THREADS'] = '1'
os.environ['OPENBLAS_NUM_THREADS'] = '1'
os.environ['NUMEXPR_MAX_THREADS'] = '1'
import pandas as pd
import numpy as np
import multiprocessing as mp
def linreg(df):
y = df[['y']].values
x = np.hstack([np.ones((df.shape[0], 1)), df[['treat']].values])
xx_inv = np.linalg.inv(x.T @ x)
beta_hat = xx_inv @ (x.T @ y)
return pd.Series(beta_hat.flat, index=['intercept', 'coef'])
def shuffle_treat(df):
df['treat'] = df['treat'].sample(frac=1, replace=False).values
return df
def run_analysis(draws, seed):
N = 5000
df = pd.DataFrame({'treat':np.random.choice([0,1], size=N, replace=True)})
df['u'] = np.random.normal(size=N)
df['y'] = df.eval('10 + 5*treat + u')
np.random.seed(seed)
est = [linreg(shuffle_treat(df)) for k in range(draws)]
est = pd.concat(est, axis=0, sort=False, keys=range(draws), names=['k', 'param'])
return est
draws = 500
param_list = [dict(draws=draws, seed=1029), dict(draws=draws, seed=1029)]
param_list_sleep = [dict(draws=draws, seed=1029, sleep=5), dict(draws=draws, seed=1029, sleep=5)]
def run_analysis_wrapper(params):
run_analysis(**params)
start = time.time()
for params in param_list:
run_analysis_wrapper(params)
end = time.time()
print(f'double run 1 process: {(end - start):.2f} sec')
start = time.time()
with mp.Pool(processes=2) as pool:
pool.map(run_analysis_wrapper, param_list)
end = time.time()
print(f'double run 2 processes: {(end - start):.2f} sec')
Output:
double run 1 process: 1.34 sec
double run 2 processes: 0.67 sec