pythonipython-parallel

How to efficiently chain ipyparallel tasks and pass intermediate results to engines?


I am trying to chain several tasks together in iPyParallel, like

import ipyparallel
client = ipyparallel.Client()
view = client.load_balanced_view()
def task1(x):
    ## Do some work.
    return x * 2
def task2(x):
    ## Do some work.
    return x * 3
def task3(x):
    ## Do some work.
    return x * 4
results1 = view.map_async(task1, [1, 2, 3])
results2 = view.map_async(task2, results1.get())
results3 = view.map_async(task3, results2.get())

However, this code won't submit any task2 unless task1 is done and is essentially blocking. My tasks can take different time and it is very inefficient. Is there an easy way that I can chain these steps efficiently and engines can get the results from previous steps? Something like:

def task2(x):
    ## Do some work.
    return x.get() * 3 ## Get AsyncResult out.
def task3(x):
    ## Do some work.
    return x.get() * 4 ## Get AsyncResult out.
results1 = [view.apply_async(task1, x) for x in [1, 2, 3]]
results2 = []
for x in result1:
    view.set_flags(after=x.msg_ids)
    results2.append(view.apply_async(task2, x))
results3 = []
for x in result2:
    view.set_flags(after=x.msg_ids)
    results3.append(view.apply_async(task3, x))

Apparently, this will fail as AsyncResult is not pickable.

I was considering a few solutions:

  1. Use view.map_async(ordered=False).

    results1 = view.map_async(task1, [1, 2, 3], ordered=False)
    for x in results1:
        results2.append(view.apply_async(task2, x.get()))
    

    But this has to wait for all task1 to finish before any task3 can be submitted. It is still blocking.

  2. Use asyncio.

    @asyncio.coroutine
    def submitter(x):
        result1 = yield from asyncio.wrap_future(view.apply_async(task1, x))
        result2 = yield from asyncio.wrap_future(view.apply_async(task2, result1)
        result3 = yield from asyncio.wrap_future(view.apply_async(task3, result2)
        yield result3
    
    @asyncio.coroutine
    def submit_all(ls):
        jobs = [submitter(x) for x in ls]
        results = []
        for async_r in asyncio.as_completed(jobs):
            r = yield from async_r
            results.append(r)
        ## Do some work, like analysing results.
    

    It is working, but the code soon become messy and unintuitive when more complicated tasks are introduced.

Thank you for your help.


Solution

  • Option 1: chain futures

    IPython parallel isn't the best at doing this because the connection has to be done at the client level. You do have to wait for the results to complete and return to the client before submitting the results. Essentially, your asyncio submit_all is the right way to do it for IPython parallel. You can get something a little more generic by writing a chain function that uses add_done_callback to submit the new task when the previous one completes:

    from concurrent.futures import Future
    from functools import partial
    
    
    def chain_apply(view, func, future):
        """Chain a call to view.apply(func, future.result()) when future is ready.
    
        Returns a Future for the subsequent result.
        """
        f2 = Future()
        # when f1 is ready, submit a new task for func on its result
        def apply_func(f):
            if f.exception():
                f2.set_exception(f.exception())
                return
            print('submitting %s(%s)' % (func.__name__, f.result()))
            ar = view.apply_async(func, f.result())
            # when ar is done, pass through the result to f2
            ar.add_done_callback(lambda ar: f2.set_result(ar.get()))
    
        future.add_done_callback(apply_func)
        return f2
    
    
    def chain_map(view, func, list_of_futures):
        """Chain a new callback on a list of futures."""
        return [ chain_apply(view, func, f) for f in list_of_futures ]
    
    # use builtin map with apply, since we want one Future per item
    results1 = map(partial(view.apply, task1), [1, 2, 3])
    results2 = chain_map(view, task2, results1)
    results3 = chain_map(view, task3, results2)
    print("Waiting for results")
    [ r.result() for r in results3 ]
    

    As with any example of add_done_callback, it can be written with coroutines, but I find the callbacks in this case to be fine. This should at least be a fairly generic utility that you can use to compose your pipeline.

    Option 2: dask.distributed

    Full disclosure: I'm the primary author of IPython Parallel, about to suggest that you use a different tool.

    It is possible to pass results from one task to another via engine namespaces and DAG dependencies in IPython parallel, but honestly, if your workflow looks like this, you should consider using dask distributed, which is designed specifically for this kind of computation graph. If you are already comfortable and familiar with IPython parallel, getting started with dask should not be too much of a burden.

    IPython 5.1 provides a handy command for turning your IPython parallel cluster into a dask distributed cluster:

    import ipyparallel as ipp
    client = ipp.Client()
    executor = client.become_distributed(ncores=1)
    

    And then the key relevant feature of dask is that you can submit futures as arguments to subsequent map calls, and the scheduler takes care of it when the results are ready, rather than having to do it explicitly in the client:

    results1 = executor.map(task1, [1, 2, 3])
    results2 = executor.map(task2, results1)
    results3 = executor.map(task3, results2)
    executor.gather(results3)
    

    So basically, dask distributed works how you wish IPython parallel's load-balancing would work when you need to chain things like this.

    This notebook illustrates both examples.