pythondaskdask-distributeddask-delayed

Dask: Continue with others task if one fails


I have a simple (but large) task Graph in Dask. This is a code example

results = []

for params in SomeIterable:
    a = dask.delayed(my_function)(**params)
    b = dask.delayed(my_other_function)(a)
    results.append(b)

dask.compute(**results)


Here SomeIterable is a list of dict, where each are arguments to my_function. In each iteration b depends on a, so if the task that produces a fails, b can't be computed. But, each element of results are independent, so I expect if one fails, the other can continue running. This does not happen in practice, if an element of results fails, then the execution of the script ends.

EDIT:

This also happen when using the submit (or map) method of the client class dask.distributed.Client, for example

futures = [client.submit(my_other_function_2, **params) for params in MyOtherIterable]
results = [ft.result() for ft in futures]

In the code above if one task fails when I try to gather a result, all code fails as in the docs


Solution

  • An easy way out of this is to wrap your functions in try/except, so something like this:

    def try_f(params):
        try:
            a = my_function(**params)
            b = my_other_function(a)
        except:
            b = f"Failed for: {params}"
        return b
    
    results = [dask.delayed(try_f)(params) for params in SomeIterable]
    computed = dask.compute(results)
    

    However, depending on your case, you might want to use the client.submit API, since it will give you some further flexibility, e.g. specifying some conditional retries.