Consider a simple workflow like this:
from dask.distributed import Client
import time
with Client() as client:
futs = client.map(time.sleep, list(range(10)))
The above code will submit and almost immediately cancel the futures since the context manager will close. It's possible to keep the context manager open until tasks are completed with client.gather
, however that will block further execution in the current process.
I am interested in submitting tasks to multiple clusters (e.g. local and distributed) within the same process, ideally without blocking the current process. It's straightforward to do with explicit definition of different clients and clusters, but is it also possible with context managers (one for each unique client/cluster)?
It might sound like a bit of an anti-pattern, but maybe there is way to close the cluster only after computations all futures run. I tried fire_and_forget
and also tried passing shutdown_on_close=False
, but that doesn't seem to be implemented.
For some Dask cluster/scheduler types, such as the dask-cloudprovider
ECSCluster, the approach described above using the with
block and shutdown_on_close=False
would work fine.
Both ECSCluster
and SLURMCluster
are derived from SpecCluster. However, ECSCluster
passes its **kwargs
(including shutdown_on_close
) down to the SpecCluster
constructor via this call:
super().__init__(**kwargs)
(see the ECSCluster
code here)
SLURMCluster
does not: it calls the JobQueueCluster
constructor which in turn instantiates SpecCluster
with only a subset of its parameters:
super().__init__(
scheduler=scheduler,
worker=worker,
loop=loop,
security=security,
silence_logs=silence_logs,
asynchronous=asynchronous,
name=name,
)
See the JobQueueCluster
code here
Therefore SLURMCluster
/JobQueueCluster
is ignoring shutdown_on_close
(and other optional parameters). Looks like an update to JobQueueCluster
would be required for your use case.