I have a simple (but large) task Graph in Dask. This is a code example
results = []
for params in SomeIterable:
a = dask.delayed(my_function)(**params)
b = dask.delayed(my_other_function)(a)
results.append(b)
dask.compute(**results)
Here SomeIterable
is a list of dict
, where each are arguments to my_function
. In each iteration b
depends on a
, so if the task that produces a
fails, b
can't be computed. But, each element of results
are independent, so I expect if one fails, the other can continue running. This does not happen in practice, if an element of results
fails, then the execution of the script ends.
EDIT:
This also happen when using the submit
(or map
) method of the client class dask.distributed.Client
, for example
futures = [client.submit(my_other_function_2, **params) for params in MyOtherIterable]
results = [ft.result() for ft in futures]
In the code above if one task fails when I try to gather a result, all code fails as in the docs
An easy way out of this is to wrap your functions in try/except
, so something like this:
def try_f(params):
try:
a = my_function(**params)
b = my_other_function(a)
except:
b = f"Failed for: {params}"
return b
results = [dask.delayed(try_f)(params) for params in SomeIterable]
computed = dask.compute(results)
However, depending on your case, you might want to use the client.submit
API, since it will give you some further flexibility, e.g. specifying some conditional retries.