I'm currently setting up a automated simulation pipeline for OpenFOAM (CFD library) using the PyFoam library within Python to create a large database for machine learning purposes. The database will have around 500k distinct simulations. To run this pipeline on multiple machines, I'm using the multiprocessing.Pool.starmap_async(args)
option which will continually start a new simulation once the old simulation has completed.
However, since some of the simulations might / will crash, I want to generate a textfile with all cases which have crashed.
I've already found this thread which implements the multiprocessing.Manager.Queue()
and adds a listener but I failed to get it running with starmap_async()
. For my testing I'm trying to print the case name for any simulation which has been completed but currently only one entry is written into the text file instead of all of them (the simulations all complete successfully).
So my question is how can I write a message to a file for each simulation which has completed.
The current code layout looks roughly like this - only important snipped has been added as the remaining code can't be run without OpenFOAM and additional customs scripts which were created for the automation.
Any help is highly appreciated! :)
from PyFoam.Execution.BasicRunner import BasicRunner
from PyFoam.Execution.ParallelExecution import LAMMachine
import numpy as np
import multiprocessing
import itertools
import psutil
# Defining global variables
manager = multiprocessing.Manager()
queue = manager.Queue()
def runCase(airfoil, angle, velocity):
# define simulation name
newCase = str(airfoil) + "_" + str(angle) + "_" + str(velocity)
'''
A lot of pre-processing commands to prepare the simulation
which has been removed from snipped such as generate geometry, create mesh etc...
'''
# run simulation
machine = LAMMachine(nr=4) # set number of cores for parallel execution
simulation = BasicRunner(argv=[solver, "-case", case.name], silent=True, lam=machine, logname="solver")
simulation.start() # start simulation
# check if simulation has completed
if simulation.runOK():
# write message into queue
queue.put(newCase)
if not simulation.runOK():
print("Simulation did not run successfully")
def listener(queue):
fname = 'errors.txt'
msg = queue.get()
while True:
with open(fname, 'w') as f:
if msg == 'complete':
break
f.write(str(msg) + '\n')
def main():
# Create parameter list
angles = np.arange(-5, 0, 1)
machs = np.array([0.15])
nacas = ['0012']
paramlist = list(itertools.product(nacas, angles, np.round(machs, 9)))
# create number of processes and keep 2 cores idle for other processes
nCores = psutil.cpu_count(logical=False) - 2
nProc = 4
nProcs = int(nCores / nProc)
with multiprocessing.Pool(processes=nProcs) as pool:
pool.apply_async(listener, (queue,)) # start the listener
pool.starmap_async(runCase, paramlist).get() # run parallel simulations
queue.put('complete')
pool.close()
pool.join()
if __name__ == '__main__':
main()
First, when your with multiprocessing.Pool(processes=nProcs) as pool:
exits, there will be an implicit call to pool.terminate()
, which will kill all pool processes and with it any running or queued up tasks. There is no point in calling queue.put('complete')
since nobody is listening.
Second, your 'listener" task gets only a single message from the queue. If is "complete", it terminates immediately. If it is something else, it just loops continuously writing the same message to the output file. This cannot be right, can it? Did you forget an additional call to queue.get()
in your loop?
Third, I do not quite follow your computation for nProcs
. Why the division by 4? If you had 5 physical processors nProcs
would be computed as 0. Do you mean something like:
nProcs = psutil.cpu_count(logical=False) // 4
if nProcs == 0:
nProcs = 1
elif nProcs > 1:
nProcs -= 1 # Leave a core free
Fourth, why do you need a separate "listener" task? Have your runCase
task return whatever message is appropriate according to how it completes back to the main process. In the code below, multiprocessing.pool.Pool.imap
is used so that results can be processed as the tasks complete and results returned:
from PyFoam.Execution.BasicRunner import BasicRunner
from PyFoam.Execution.ParallelExecution import LAMMachine
import numpy as np
import multiprocessing
import itertools
import psutil
def runCase(tpl):
# Unpack tuple:
airfoil, angle, velocity = tpl
# define simulation name
newCase = str(airfoil) + "_" + str(angle) + "_" + str(velocity)
... # Code omitted for brevity
# check if simulation has completed
if simulation.runOK():
return '' # No error
# Simulation did not run successfully:
return f"Simulation {newcase} did not run successfully"
def main():
# Create parameter list
angles = np.arange(-5, 0, 1)
machs = np.array([0.15])
nacas = ['0012']
# There is no reason to convert this into a list; it
# can be lazilly computed:
paramlist = itertools.product(nacas, angles, np.round(machs, 9))
# create number of processes and keep 1 core idle for main process
nCores = psutil.cpu_count(logical=False) - 1
nProc = 4
nProcs = int(nCores / nProc)
with multiprocessing.Pool(processes=nProcs) as pool:
with open('errors.txt', 'w') as f:
# Process message results as soon as the task ends.
# Use method imap_unordered if you do not care about the order
# of the messages in the output.
# We can only pass a single argument using imap, so make it a tuple:
for msg in pool.imap(runCase, zip(paramlist)):
if msg != '': # Error completion
print(msg)
print(msg, file=f)
pool.join() # Not really necessary here
if __name__ == '__main__':
main()