Maybe this is really simple, but I am having a bit of a problem understanding this.
The challenge I have is to execute a child parallel function from inside a mother function. That mother function should run only once while waiting for the results of the child parallel function calls.
I wrote a little example which shows my dilemma.
import string
from joblib import Parallel, delayed
import multiprocessing
def jobToDoById(id):
#do some other logic based on the ID given
rand_str = ''.join(random.choice(string.ascii_lowercase + string.ascii_uppercase + string.digits) for i in range(10))
return [id, rand_str]
def childFunctionParallel(jobs):
num_cores = multiprocessing.cpu_count()
num_cores = num_cores - 1
if __name__ == '__main__':
p = Parallel(n_jobs=num_cores)(delayed(jobToDoById)(i) for i in jobs)
return p
def childFunctionSerial(jobs):
result = []
for job in jobs:
job_result = jobToDoById(job)
result.append(job_result)
return result
def motherFunction(countries_cities, doInParallel):
result = []
print("Start mainLogic")
for country in countries_cities:
city_list = countries_cities[country]
if(doInParallel):
cities_result = childFunctionParallel(city_list)
else:
cities_result = childFunctionSerial(city_list)
result.append(cities_result)
# ..... do some more logic
# ..... do some more logic before returning
print("End mainLogic")
return result
print("Start Program")
countries_cities = {
"United States" : ["Alabama", "Hawaii", "Mississippi", "Pennsylvania"],
"United Kingdom" : ["Cambridge", "Coventry", "Gloucester", "Nottingham"],
"France" : ["Marseille", "Paris", "Saint-Denis", "Nanterre", "Aubervilliers"],
"Denmark" : ["Aarhus", "Slagelse", "Nykøbing F", "Rønne", "Odense"],
"Australia" : ["Sydney", "Townsville", "Bendigo", "Bathurst", "Busselton"],
}
result_mother = motherFunction(countries_cities, doInParallel=True) # should be executed only once
print(result_mother)
print("End Program")
If you toggle the doInParallel
between True
and False
then you can see the problem. When running with the childFunctionSerial()
the motherFunction()
runs only once. But when we run with the childFunctionParallel
then the motherFunction()
is executed multiple times. Both give the same result but the problem I have is that motherFunction()
should be executed only once.
Two questions:
1. How to restructure the program so that we executed the mother function once
and from inside it, start a parallel job without running multiple instances of the same mother function?
2. How can I pass a second parameter to the jobToDoById()
besides the id
?
( id, .., )
This one was simple and is commonly used, so one can meet it in many examples.
def jobToDoById( aTupleOfPARAMs = ( -1, ) ): # jobToDoById(id):
# # do some other logic based on the ID given
if not type( aTupleOfPARAMs ) is tuple: # FUSE PROTECTION
return [-1, "call interface violated"]
if aTupleOfPARAMs[0] == -1: # FUSE PROTECTION
return [-1, None]
# .......................................# GO GET PROCESSED:
rand_str = ''.join( random.choice( string.ascii_lowercase
+ string.ascii_uppercase
+ string.digits
)
for i in range( 10 )
)
return [id, rand_str]
The first question is a bit harder, yet way more interesting as system-design's principal differences among [SERIAL]
, "just"-[CONCURRENT]
and true-[PARALLEL]
system-scheduling policies of more than one processes are not always respected in popular media ( and sometimes even not in the Academia ).
Your code mentioned joblib.Parallel
and multiprocessing
modules explicitly, yet documentation says:
By default
Parallel
uses the Pythonmultiprocessing
module to fork separate Python worker processes to execute tasks concurrently on separate CPUs. This is a reasonable default for generic Python programs but it induces some overhead as the input and output data need to be serialized in a queue for communication with the worker processes.
There are two implications - your processing will pay a dual, [TIME]
-domain and [SPACE]
-domain overhead costs, that may easily become unacceptably huge OVERHEAD COSTS ( and if one has already noticed also the words "data" and "serialized" in the citation above, the better ) - for details see re-formulated Amdahl's Law, as detailed in Section: Criticism et al parallelism-amdahl:
The whole Python interpreter including it's data and internal state is fully forked ( so you get as many copies as instructed, each running just one process-flow, which is made for the sake of not loosing performance on a GIL-round-robin fragmentation / Only-1-runs-All-Others-have-to-wait type of GIL-blocking / stepping present any 1+ processing-flow if made in threading-based pools etc. )
Besides all the complete Python interpreter + state re-instantiations that have to take place as noted above, also ALL <data-IN>
+ <data-OUT>
are:
----------------------------MAIN-starts-to-escape-from-pure-[SERIAL]-processing-- 0: MAIN forks self 1 2 ... [n_jobs] - as many copies of self as requested -------------------------MAIN-can-continue-in-"just"-[CONCURRENT]-after: 1st-Data-IN-SERialised-in-MAIN's-"main"
which all together always costs non-negligible overhead-time ( for equations and details, kindly Ref.: overhead-strict re-formulation of net-speedups achievable upon these add-on overhead costs, best before diving into a refactoring, where your machine will pay way more than what it gets from attempts to ignore these principal and benchmarkable overhead costs )
For benchmarking these overhead costs, each separately, in microsecond measurements, tools are available ( yet not all Stack Overflow Members felt happy on doing quantitatively robust benchmarking of these ), just check other posts on parallelism-amdahl here on Stack Overflow.
The second principal limitation of joblib.Parallel
implementation, that strucks, if not headbangs, into Amdahl's Law, is a resources-real-availability-agnostic optimism, while resources-state-aware scheduling is the thing that happens on each real-world system.
One may expect any high-degree of parallel code execution, but unless complex measures are taken on an end-to-end ( top-to-bottom ) system coverage, all processing goes but into a "just"-[CONCURRENT]
schedule ( i.e. if resources permit ). This aspect is way extending the footprint of this post, and was just naively put into the scheme above, showing, that if CPU-cores ( and principally any other resource-class ) are not available, the concurrency
will never reach the levels of speedup, that a resources-availability agnostic original Amdahl's Law was promising.
----------------------------MAIN-starts-escape-from-processing---in-pure-[SERIAL]
0: MAIN forks self -in-pure-[SERIAL]
[1] -in-pure-[SERIAL]
[2] -in-pure-[SERIAL]
... -in-pure-[SERIAL]
[n_jobs] as many copies of self-in-pure-[SERIAL]
as requested -in-pure-[SERIAL]
--------------------------MAIN-can-continue-in-"just"-[CONCURRENT]after[SERIAL]
+ 1st-Data-IN-SERialised-in-MAIN's-"__main__" , job(2), .., job(n_jobs):[SERIAL]
+ 2nd-Data-IN-QEUueed in MAIN for all job(1), job(2), .., job(n_jobs):[SERIAL]
+ 3rd-Data-IN-DEQueued [ith_job]s: "just"-[CONCURRENT]||X||X||
+ 4th-Data-IN-DESerialised [ith_job]s: "just"-[CONCURRENT]|X||X|||
+ ( ...process operated the usefull [ith_job]s-<The PAYLOAD>-planned... )||X|||X|
+ 5th-Data-OUT-SERialised [ith_job]s: "just"-[CONCURRENT]||||X|||
+ 6th-Data-OUT-QUEued [ith_job]s: "just"-[CONCURRENT]|X|X|X||
+ 7th-Data-OUT-DEQueued in-MAIN <--l job(1), job(2), .., job(n_jobs):[SERIAL]
+ 8th-Data-OUT-DESerialised-in-MAIN's-"__main__" job(2), .., job(n_jobs):[SERIAL]
-------------------------------MAIN-can-continue-processing------in-pure-[SERIAL]
... -in-pure-[SERIAL]