pythondaskdask-distributedcontextmanagerfire-and-forget

dask clusters with context manager


Consider a simple workflow like this:

from dask.distributed import Client
import time

with Client() as client:
    futs = client.map(time.sleep, list(range(10)))

The above code will submit and almost immediately cancel the futures since the context manager will close. It's possible to keep the context manager open until tasks are completed with client.gather, however that will block further execution in the current process.

I am interested in submitting tasks to multiple clusters (e.g. local and distributed) within the same process, ideally without blocking the current process. It's straightforward to do with explicit definition of different clients and clusters, but is it also possible with context managers (one for each unique client/cluster)?

It might sound like a bit of an anti-pattern, but maybe there is way to close the cluster only after computations all futures run. I tried fire_and_forget and also tried passing shutdown_on_close=False, but that doesn't seem to be implemented.


Solution

  • For some Dask cluster/scheduler types, such as the dask-cloudprovider ECSCluster, the approach described above using the with block and shutdown_on_close=False would work fine.

    Both ECSCluster and SLURMCluster are derived from SpecCluster. However, ECSCluster passes its **kwargs (including shutdown_on_close) down to the SpecCluster constructor via this call:

    super().__init__(**kwargs)
    

    (see the ECSCluster code here)

    SLURMCluster does not: it calls the JobQueueCluster constructor which in turn instantiates SpecCluster with only a subset of its parameters:

    super().__init__(
        scheduler=scheduler,
        worker=worker,
        loop=loop,
        security=security,
        silence_logs=silence_logs,
        asynchronous=asynchronous,
        name=name,
    )
    

    See the JobQueueCluster code here

    Therefore SLURMCluster/JobQueueCluster is ignoring shutdown_on_close (and other optional parameters). Looks like an update to JobQueueCluster would be required for your use case.