pythonpython-3.xpandasnumpymultiprocessing

Why is this python code not running faster with parallelization?


This is a MWE of some code I'm writing to do some monte carlo exercises. I need to estimate models across draws and I'm parallelizing across models. In the MWE a "model" is just parametrized by a number of draws and a seed.

I define the functions below.

import time
import pandas as pd
import numpy as np
import multiprocessing as mp
    
def linreg(df):
    y = df[['y']].values
    x = np.hstack([np.ones((df.shape[0], 1)), df[['treat']].values])
    
    xx_inv = np.linalg.inv(x.T @ x)
    beta_hat = xx_inv @ (x.T @ y)
    
    return pd.Series(beta_hat.flat, index=['intercept', 'coef'])

def shuffle_treat(df):
    df['treat'] = df['treat'].sample(frac=1, replace=False).values
    return df
    
def run_analysis(draws, seed, sleep=0):
    
    N = 5000
    df = pd.DataFrame({'treat':np.random.choice([0,1], size=N, replace=True)})
    df['u'] = np.random.normal(size=N)
    df['y'] = df.eval('10 + 5*treat + u')

    np.random.seed(seed)
    
    time.sleep(sleep)
    
    est = [linreg(shuffle_treat(df)) for k in range(draws)]

    est = pd.concat(est, axis=0, sort=False, keys=range(draws), names=['k', 'param'])
    
    return est

I then test them and show that running in serial takes a similar amount of time as running in parallel. I can confirm they are running in parallel because if I force some sleep time there is a clear gain from parallelization. I know the problem is coming from this list comprehension: [linreg(shuffle_treat(df)) for k in range(draws)], but I don't understand why I don't achieve gains from parallelization across models. I've tried to parallelize across draws instead, but the results were even worse.

param_list = [dict(draws=500, seed=1029), dict(draws=500, seed=1029)]
param_list_sleep = [dict(draws=500, seed=1029, sleep=5), dict(draws=500, seed=1029, sleep=5)]

def run_analysis_wrapper(params):
    run_analysis(**params)

start = time.time()
for params in param_list:
    run_analysis_wrapper(params)
end = time.time()
print(f'double run 1 process: {(end - start):.2f} sec')

start = time.time()
with mp.Pool(processes=2) as pool:
    pool.map(run_analysis_wrapper, param_list)
end = time.time()
print(f'double run 2 processes: {(end - start):.2f} sec')

start = time.time()
for params in param_list_sleep:
    run_analysis_wrapper(params)
end = time.time()
print(f'double run 1 process w/ sleep: {(end - start):.2f} sec')

start = time.time()
with mp.Pool(processes=2) as pool:
    pool.map(run_analysis_wrapper, param_list_sleep)
end = time.time()
print(f'double run 2 processes w/ sleep: {(end - start):.2f} sec')

Output:

double run 1 process: 2.52 sec
double run 2 processes: 2.94 sec
double run 1 process w/ sleep: 12.30 sec
double run 2 processes w/ sleep: 7.71 sec

For reference machine is Linux-based EC2 instance with nproc --a showing 48 CPUs. I'm running within a conda environment with Python 3.9.16.


Solution

  • Based on the comment by Nils Werner, I tried disabling multithreading in Numpy. And now I get the gains from parallelization. Interestingly, the serial version is also about twice as fast.

    import time
    import os
    os.environ['OMP_NUM_THREADS'] = '1'
    os.environ['MKL_NUM_THREADS'] = '1'
    os.environ['OPENBLAS_NUM_THREADS'] = '1'
    os.environ['NUMEXPR_MAX_THREADS'] = '1'
    import pandas as pd
    import numpy as np
    import multiprocessing as mp
        
    def linreg(df):
        y = df[['y']].values
        x = np.hstack([np.ones((df.shape[0], 1)), df[['treat']].values])
        
        xx_inv = np.linalg.inv(x.T @ x)
        beta_hat = xx_inv @ (x.T @ y)
        
        return pd.Series(beta_hat.flat, index=['intercept', 'coef'])
    
    def shuffle_treat(df):
        df['treat'] = df['treat'].sample(frac=1, replace=False).values
        return df
        
    def run_analysis(draws, seed):
        
        N = 5000
        df = pd.DataFrame({'treat':np.random.choice([0,1], size=N, replace=True)})
        df['u'] = np.random.normal(size=N)
        df['y'] = df.eval('10 + 5*treat + u')
    
        np.random.seed(seed)
        
        est = [linreg(shuffle_treat(df)) for k in range(draws)]
    
        est = pd.concat(est, axis=0, sort=False, keys=range(draws), names=['k', 'param'])
        
        return est
    
    draws = 500
    param_list = [dict(draws=draws, seed=1029), dict(draws=draws, seed=1029)]
    param_list_sleep = [dict(draws=draws, seed=1029, sleep=5), dict(draws=draws, seed=1029, sleep=5)]
    
    def run_analysis_wrapper(params):
        run_analysis(**params)
    
    start = time.time()
    for params in param_list:
        run_analysis_wrapper(params)
    end = time.time()
    print(f'double run 1 process: {(end - start):.2f} sec')
    
    start = time.time()
    with mp.Pool(processes=2) as pool:
        pool.map(run_analysis_wrapper, param_list)
    end = time.time()
    print(f'double run 2 processes: {(end - start):.2f} sec')
    

    Output:

    double run 1 process: 1.34 sec
    double run 2 processes: 0.67 sec