pythonparallel-processingmultiprocessingparallelism-amdahl

Why does the get() operation in multiprocessing.Pool.map_async take so long?


import multiprocessing as mp
import numpy as np

pool   = mp.Pool( processes = 4 )
inp    = np.linspace( 0.01, 1.99, 100 )
result = pool.map_async( func, inp ) #Line1 ( func is some Python function which acts on input )
output = result.get()                #Line2

So, I was trying to parallelize some code in Python, using a .map_async() method on a multiprocessing.Pool() instance.

I noticed that while
Line1 takes around a thousandth of a second,
Line2 takes about .3 seconds.

Is there a better way to do this or a way to get around the bottleneck caused by Line2,
or
am I doing something wrong here?

( I am rather new to this. )


Solution

  • Am I doing something wrong here?

    Do not panic, many users do the very same - Paid more than received.

    This is a common lecture not on using some "promising" syntax-constructor, but on paying the actual costs for using it.

    The story is long, the effect was straightforward - you expected a low hanging fruit, but had to pay an immense cost of process-instantiation, work-package re-distribution and for collection of results, all that circus just for doing but a few rounds of func()-calls.


    Wow? Stop! Parallelisation was brought to me that will SPEEDUP processing?!?

    Well, who told you that any such ( potential ) speedup is for free?

    Let's be quantitative and instead measure the actual code-execution time.

    Benchmarking is always a fair move. It helps us to escape from just expectations and get ourselves into quantitative evidence-supported knowledge:

    from zmq import Stopwatch; aClk = Stopwatch() # this is a handy tool to do so
    

    AS-IS test:

    Before moving forwards, one ought record this pair:

    >>> aClk.start(); _ = [   func( SEQi ) for SEQi in inp ]; aClk.stop() # [SEQ] 
    >>> HowMuchWillWePAY2RUN( func, 4, 100 )                              # [RUN]
    >>> HowMuchWillWePAY2MAP( func, 4, 100 )                              # [MAP]
    

    This will set the span among the performance envelopes from a pure-[SERIAL] [SEQ]-of-calls, to an un-optimised joblib.Parallel() or any other, if one wishes to extend the experiment with any other tools, like a said multiprocessing.Pool() or other.


    Test-case A:

    Intent:
    so as to measure the cost of a { process | job }-instantiation, we need a NOP-work-package payload, that will spend almost nothing "there" but return "back" and will not require to pay any additional add-on costs ( be it for any input parameters' transmissions or returning any value )

    def a_NOP_FUN( aNeverConsumedPAR ):
        """                                                 __doc__
        The intent of this FUN() is indeed to do nothing at all,
                                 so as to be able to benchmark
                                 all the process-instantiation
                                 add-on overhead costs.
        """
        pass
    

    So, the setup-overhead add-on costs comparison is here:

    #-------------------------------------------------------<function a_NOP_FUN
    [SEQ]-pure-[SERIAL] worked within ~   37 ..     44 [us] on this localhost
    [MAP]-just-[CONCURENT] tool         2536 ..   7343 [us]
    [RUN]-just-[CONCURENT] tool       111162 .. 112609 [us]
    

    Using a strategy of
    joblib.delayed() on joblib.Parallel() task-processing:

    def HowMuchWillWePAY2RUN( aFun2TEST = a_NOP_FUN, JOBS_TO_SPAWN = 4, RUNS_TO_RUN = 10 ):
        from zmq import Stopwatch; aClk = Stopwatch()
        try:
             aClk.start()
             joblib.Parallel(  n_jobs = JOBS_TO_SPAWN
                              )( joblib.delayed( aFun2TEST )
                                               ( aFunPARAM )
                                           for ( aFunPARAM )
                                           in  range( RUNS_TO_RUN )
                                 )
        except:
             pass
        finally:
             try:
                 _ = aClk.stop()
             except:
                 _ = -1
                 pass
        pass;  pMASK = "CLK:: {0:_>24d} [us] @{1: >4d}-JOBs ran{2: >6d} RUNS {3:}"
        print( pMASK.format( _,
                             JOBS_TO_SPAWN,
                             RUNS_TO_RUN,
                             " ".join( repr( aFun2TEST ).split( " ")[:2] )
                             )
                )
    

    Using a strategy of a lightweight
    .map_async() method on a multiprocessing.Pool() instance:

    def HowMuchWillWePAY2MAP( aFun2TEST = a_NOP_FUN, PROCESSES_TO_SPAWN = 4, RUNS_TO_RUN = 1 ):
        from zmq import Stopwatch; aClk = Stopwatch()
        try:
             import numpy           as np
             import multiprocessing as mp
             
             pool = mp.Pool( processes = PROCESSES_TO_SPAWN )
             inp  = np.linspace( 0.01, 1.99, 100 )
             
             aClk.start()
             for i in xrange( RUNS_TO_RUN ):
                 pass;    result = pool.map_async( aFun2TEST, inp )
                 output = result.get()
             pass
        except:
             pass
        finally:
             try:
                 _ = aClk.stop()
             except:
                 _ = -1
                 pass
        pass;  pMASK = "CLK:: {0:_>24d} [us] @{1: >4d}-PROCs ran{2: >6d} RUNS {3:}"
        print( pMASK.format( _,
                             PROCESSES_TO_SPAWN,
                             RUNS_TO_RUN,
                             " ".join( repr( aFun2TEST ).split( " ")[:2] )
                             )
                )
    

    So, the first set of pain and surprises comes straight at the actual cost-of-doing-NOTHING in a concurrent pool of joblib.Parallel():

     CLK:: __________________117463 [us] @   4-JOBs ran    10 RUNS <function a_NOP_FUN
     CLK:: __________________111182 [us] @   3-JOBs ran   100 RUNS <function a_NOP_FUN
     CLK:: __________________110229 [us] @   3-JOBs ran   100 RUNS <function a_NOP_FUN
     CLK:: __________________110095 [us] @   3-JOBs ran   100 RUNS <function a_NOP_FUN
     CLK:: __________________111794 [us] @   3-JOBs ran   100 RUNS <function a_NOP_FUN
     CLK:: __________________110030 [us] @   3-JOBs ran   100 RUNS <function a_NOP_FUN
     CLK:: __________________110697 [us] @   3-JOBs ran   100 RUNS <function a_NOP_FUN
     CLK:: _________________4605843 [us] @ 123-JOBs ran   100 RUNS <function a_NOP_FUN
     CLK:: __________________336208 [us] @ 123-JOBs ran   100 RUNS <function a_NOP_FUN
     CLK:: __________________298816 [us] @ 123-JOBs ran   100 RUNS <function a_NOP_FUN
     CLK:: __________________355492 [us] @ 123-JOBs ran   100 RUNS <function a_NOP_FUN
     CLK:: __________________320837 [us] @ 123-JOBs ran   100 RUNS <function a_NOP_FUN
     CLK:: __________________308365 [us] @ 123-JOBs ran   100 RUNS <function a_NOP_FUN
     CLK:: __________________372762 [us] @ 123-JOBs ran   100 RUNS <function a_NOP_FUN
     CLK:: __________________304228 [us] @ 123-JOBs ran   100 RUNS <function a_NOP_FUN
     CLK:: __________________337537 [us] @ 123-JOBs ran   100 RUNS <function a_NOP_FUN
     CLK:: __________________941775 [us] @ 123-JOBs ran 10000 RUNS <function a_NOP_FUN
     CLK:: __________________987440 [us] @ 123-JOBs ran 10000 RUNS <function a_NOP_FUN
     CLK:: _________________1080024 [us] @ 123-JOBs ran 10000 RUNS <function a_NOP_FUN
     CLK:: _________________1108432 [us] @ 123-JOBs ran 10000 RUNS <function a_NOP_FUN
     CLK:: _________________7525874 [us] @ 123-JOBs ran100000 RUNS <function a_NOP_FUN
    

    So, this scientifically fair and rigorous test started from this simplest ever case, already showing the benchmarked costs of all the associated code-execution processing setup-overheads a smallest ever joblib.Parallel() penalty sine-qua-non.

    This forwards us into a direction, where real-world algorithms do live - best with next adding some larger and larger "payload"-sizes into the testing loop.


    Now, we know the penalty
    for going into a "just"-[CONCURRENT] code-execution - and next?

    Using this systematic and lightweight approach, we may go forwards in the story, as we will need to also benchmark the add-on costs and other Amdahl's Law indirect effects of { remote-job-PAR-XFER(s) | remote-job-MEM.alloc(s) | remote-job-CPU-bound-processing | remote-job-fileIO(s) }

    A function template like this may help in re-testing ( as you see there will be a lot to re-run, while the O/S noise and some additional artifacts will step into the actual cost-of-use patterns ):


    Test-case B:

    Once we have paid the up-front cost, the next most common mistake is to forget the costs of memory allocations. So, let's test it:

    def a_NOP_FUN_WITH_JUST_A_MEM_ALLOCATOR( aNeverConsumedPAR, SIZE1D = 1000 ):
        """                                                 __doc__
        The intent of this FUN() is to do nothing but
                                 a MEM-allocation
                                 so as to be able to benchmark
                                 all the process-instantiation
                                 add-on overhead costs.
        """
        import numpy as np              # yes, deferred import, libs do defer imports
        aMemALLOC = np.zeros( ( SIZE1D, #       so as to set
                                SIZE1D, #       realistic ceilings
                                SIZE1D, #       as how big the "Big Data"
                                SIZE1D  #       may indeed grow into
                                ),
                              dtype = np.float64,
                              order = 'F'
                              )         # .ALLOC + .SET
        aMemALLOC[2,3,4,5] = 8.7654321  # .SET
        aMemALLOC[3,3,4,5] = 1.2345678  # .SET
        
        return aMemALLOC[2:3,3,4,5]
    

    In case your platform will stop to be able to allocate the requested memory-blocks, there we head-bang into another kind of problems ( with a class of hidden glass-ceilings if trying to go-parallel in a physical-resources agnostic manner ). One may edit the SIZE1D scaling, so as to at least fit into the platform RAM addressing / sizing capabilities, yet, the performance envelopes of the real-world problem computing are still of our great interest here:

    >>> HowMuchWillWePAY2RUN( a_NOP_FUN_WITH_JUST_A_MEM_ALLOCATOR, 200, 1000 )
    

    may yield
    a cost-to-pay, being anything between 0.1 [s] and +9 [s] (!!)
    just for doing STILL NOTHING, but now also without forgetting about some realistic MEM-allocation add-on costs "there"

    CLK:: __________________116310 [us] @   4-JOBs ran    10 RUNS <function a_NOP_FUN_WITH_JUST_A_MEM_ALLOCATOR
    CLK:: __________________120054 [us] @   4-JOBs ran    10 RUNS <function a_NOP_FUN_WITH_JUST_A_MEM_ALLOCATOR
    CLK:: __________________129441 [us] @  10-JOBs ran   100 RUNS <function a_NOP_FUN_WITH_JUST_A_MEM_ALLOCATOR
    CLK:: __________________123721 [us] @  10-JOBs ran   100 RUNS <function a_NOP_FUN_WITH_JUST_A_MEM_ALLOCATOR
    CLK:: __________________127126 [us] @  10-JOBs ran   100 RUNS <function a_NOP_FUN_WITH_JUST_A_MEM_ALLOCATOR
    CLK:: __________________124028 [us] @  10-JOBs ran   100 RUNS <function a_NOP_FUN_WITH_JUST_A_MEM_ALLOCATOR
    CLK:: __________________305234 [us] @ 100-JOBs ran   100 RUNS <function a_NOP_FUN_WITH_JUST_A_MEM_ALLOCATOR
    CLK:: __________________243386 [us] @ 100-JOBs ran   100 RUNS <function a_NOP_FUN_WITH_JUST_A_MEM_ALLOCATOR
    CLK:: __________________241410 [us] @ 100-JOBs ran   100 RUNS <function a_NOP_FUN_WITH_JUST_A_MEM_ALLOCATOR
    CLK:: __________________267275 [us] @ 100-JOBs ran   100 RUNS <function a_NOP_FUN_WITH_JUST_A_MEM_ALLOCATOR
    CLK:: __________________244207 [us] @ 100-JOBs ran   100 RUNS <function a_NOP_FUN_WITH_JUST_A_MEM_ALLOCATOR
    CLK:: __________________653879 [us] @ 100-JOBs ran  1000 RUNS <function a_NOP_FUN_WITH_JUST_A_MEM_ALLOCATOR
    CLK:: __________________405149 [us] @ 100-JOBs ran  1000 RUNS <function a_NOP_FUN_WITH_JUST_A_MEM_ALLOCATOR
    CLK:: __________________351182 [us] @ 100-JOBs ran  1000 RUNS <function a_NOP_FUN_WITH_JUST_A_MEM_ALLOCATOR
    CLK:: __________________362030 [us] @ 100-JOBs ran  1000 RUNS <function a_NOP_FUN_WITH_JUST_A_MEM_ALLOCATOR
    CLK:: _________________9325428 [us] @ 200-JOBs ran  1000 RUNS <function a_NOP_FUN_WITH_JUST_A_MEM_ALLOCATOR
    CLK:: __________________680429 [us] @ 200-JOBs ran  1000 RUNS <function a_NOP_FUN_WITH_JUST_A_MEM_ALLOCATOR
    CLK:: __________________533559 [us] @ 200-JOBs ran  1000 RUNS <function a_NOP_FUN_WITH_JUST_A_MEM_ALLOCATOR
    CLK:: _________________1125190 [us] @ 200-JOBs ran  1000 RUNS <function a_NOP_FUN_WITH_JUST_A_MEM_ALLOCATOR
    CLK:: __________________591109 [us] @ 200-JOBs ran  1000 RUNS <function a_NOP_FUN_WITH_JUST_A_MEM_ALLOCATOR
    

    Test-case C:

    kindly read the tail sections of this post

    Test-case D:

    kindly read the tail sections of this post


    Epilogue:

    For each and every "promise", the fair best next step is first to cross-validate the actual code-execution costs, before starting any code re-engineering. The sum of real-world platform's add-on costs may devastate any expected speedups, even if the original, overhead-naive Amdahl's Law might have created some expected speedup-effects.

    As Mr. Walter E. Deming has expressed many times, without DATA we make ourselves left to just OPINIONS.


    A bonus part:
    having read as far as here, one might already found, that there is not any kind of "drawback" or "error" in the #Line2 per se, but the careful design practice will show any better syntax-constructor, that spend less to achieve more ( as actual resources ( CPU, MEM, IOs, O/S ) permit on the code-execution platform ). Anything else is not principally different from fortune-telling.