pythonparallel-processingopenmdaomatlab-engine

How can you force instances of MATLAB Engine to operate in parallel from Python within a Python function?


I am trying to run from Python a MATLAB model so that model design parameters can be optimized in a genetic algorithm using OpenMDAO. I want to divide the number of MATLAB model runs by the number of available PC cores. So given 90 MATLAB model runs, each core would be operating an instance of MATLAB engine that accounts for 15 of the total model runs. I want all of the MATLAB instances to be running in parallel.

The problem with my code is that it proceeds through the test matrix core-by-core. Windows task manager shows one MATLAB engine instance takes the first 15 model runs, then the second MATLAB engine takes the next 15 model runs, and so on. So there seem to be no time savings with my model calling method. How can you force the MATLAB engines to operate in parallel? I am trying something similar to the top answer post here, except the parallel operation is couched within functions as shown below.

# Open MATLAB engines
n_cores = 6 # Number of available PC cores
eng = []
for i in range(n_cores):
    eng.append(matlab.engine.start_matlab())
    eng[i].cd(r'C:\path_to_model', nargout=0)


# OpenMDAO genetic algorithm wraps around Python function that calls function that calls MATLAB engines
f = (omf.wrap(model_run_func))

#Set up design and objective variables for optimization within OpenMDAO
prob.model.add_design_var(des_vars)
prob.model.add_objective(obj_vars)

# Run the optimization 
prob.run_driver

# MATLAB model caller
def model_run_func(des_vars):

    '''
    Set up test cases and specify model inputs
    '''
    # Call function that divides model calls by number of PC cores
    output_list =  parallel(inputs_list)

    '''
    Unpack output_list and return model performance scores to OpenMDAO
    '''
# Divide-by-core function
def parallel(*args): 
    n_cases = len(args[0][0])
    n_cases_per_batch = math.ceil(n_cases/n_cores)
    test_index = np.array(range(n_cases))
    input_list = args[0]        
    for i_core in range(n_cores):
        test_index_this_core = 0
        inputs_list_this_core = [] 
        
        # This goes from 0:14, then 15:29, then 30 to...                
        test_index_this_batch = test_index_this_core + n_cases_per_batch * i_core
        
        # break if we exceed 15 cases for this core, or total number of cases
        if (test_index_this_core > n_cases_per_batch) or (test_index_this_batch > n_cases):
            break
        
        # inputs_list_this_core = inputs_list_this_core.append(input_list[test_index_this_batch])
        for inner_index in range(n_cases_per_batch):
            inner_test_index_this_batch = inner_index + test_index_this_batch
            if (inner_test_index_this_batch > (n_cases - 1)):
                break
            inputs_list_this_core_appendage = [x[inner_test_index_this_batch] for x in input_list[:]]
            inputs_list_this_core.append(inputs_list_this_core_appendage)
            
            # Add core index so you're not opening new MATLAB instances and crashing the program
            for index in range(len(inputs_list_this_core)):
                inputs_list_this_core[index].append(i_core)

        future = model(*inputs_list_this_core)
        
        test_index_this_core = test_index_this_core + 1
    for list in range(len(future_list)):
        for number in range(len(future_list[list])):
            if future_list[list][number].done():
                output_list.append(future_list[list][number].result())
            elif not future_list[list][number].done():
                while True:
                    if not future_list[list][number].done():
                        time.sleep(0.005)
                        continue
                    else:
                        output_list.append(future_list[list][number].result())
                        break
                
    return  output_list


def model(*args)

    output = []
    
    # Call # of parallel instances of the MATLAB engine to run the model based on # of available PC cores (with inputs specified above)
    number_cases_this_core = range(len(args))

    for i in number_cases_this_core:
        test_index = args[i][0] 
        '''
        Unpack model inputs to be given to MATLAB engine
        '''
        core_index = args[i][13]
        
        items = eng[int(core_index)].matlab_model(model_inputs,nargout=26,background=True)
        output.append(items)
            
    return output

Solution

  • Here is the code with the solution from @MegaIng.

    # Open MATLAB engines
    n_cores = 6 # Number of available PC cores
    eng = []
    for i in range(n_cores):
        eng.append(matlab.engine.start_matlab())
        eng[i].cd(r'C:\path_to_model', nargout=0)
    
    
    # OpenMDAO genetic algorithm wraps around Python function that calls function that calls MATLAB engines
    f = (omf.wrap(model_run_func))
    
    #Set up design and objective variables for optimization within OpenMDAO
    prob.model.add_design_var(des_vars)
    prob.model.add_objective(obj_vars)
    
    # Run the optimization 
    prob.run_driver
    
    # MATLAB model caller
    def model_run_func(des_vars):
    
        '''
        Set up test cases and specify model inputs
        '''
        # Call function that divides model calls by number of PC cores
        output_list =  parallel(inputs_list)
    
        '''
        Unpack output_list and return model performance scores to OpenMDAO
        '''
    # Divide-by-core function
    def parallel(*args): 
        n_cases = len(args[0][0])
        n_cases_per_batch = math.ceil(n_cases/n_cores)
        test_index = np.array(range(n_cases))
        input_list = args[0]        
        for i_core in range(n_cores):
            test_index_this_core = 0
            inputs_list_this_core = [] 
            
            # This goes from 0:14, then 15:29, then 30 to...                
            test_index_this_batch = test_index_this_core + n_cases_per_batch * i_core
            
            # break if we exceed 15 cases for this core, or total number of cases
            if (test_index_this_core > n_cases_per_batch) or (test_index_this_batch > n_cases):
                break
            
            # inputs_list_this_core = inputs_list_this_core.append(input_list[test_index_this_batch])
            for inner_index in range(n_cases_per_batch):
                inner_test_index_this_batch = inner_index + test_index_this_batch
                if (inner_test_index_this_batch > (n_cases - 1)):
                    break
                inputs_list_this_core_appendage = [x[inner_test_index_this_batch] for x in input_list[:]]
                inputs_list_this_core.append(inputs_list_this_core_appendage)
                
                # Add core index so you're not opening new MATLAB instances and crashing the program
                for index in range(len(inputs_list_this_core)):
                    inputs_list_this_core[index].append(i_core)
    
            future = model(*inputs_list_this_core)
            
            test_index_this_core = test_index_this_core + 1
        for list in range(len(future_list)):
            for number in range(len(future_list[list])):
                if future_list[list][number].done():
                    output_list.append(future_list[list][number].result())
                elif not future_list[list][number].done():
                    while True:
                        if not future_list[list][number].done():
                            time.sleep(0.005)
                            continue
                        else:
                            output_list.append(future_list[list][number].result())
                            break
                    
        return  output_list
    
    
    def model(*args)
    
        output = []
        
        # Call # of parallel instances of the MATLAB engine to run the model based on # of available PC cores (with inputs specified above)
        number_cases_this_core = range(len(args))
    
        for i in number_cases_this_core:
            test_index = args[i][0] 
            '''
            Unpack model inputs to be given to MATLAB engine
            '''
            core_index = args[i][13]
            
            items = eng[int(core_index)].matlab_model(model_inputs,nargout=26,background=True)
            output.append(items)
                
        return output