pythondaskdask-distributeddask-dataframedask-delayed

Setting maximum number of workers in Dask map function


I have a Dask process that triggers 100 workers with a map function:

worker_args = .... # array with 100 elements with worker parameters 

futures = client.map(function_in_worker, worker_args) 
worker_responses = client.gather(futures)

I use docker where each worker is a container. I have configured docker to spawn 20 workers/containers, like so:

docker-compose up -d --scale worker=20

The problem is that my machine crashes because the map function triggers 20 workers in parallel and that makes memory and CPU exceed the maximum.

I want to keep the configuration of 20 workers because I use the workers for other functions that don't require large amount of memory.

How to limit the map function to, say, 5 workers in parallel?


Solution

  • dask does not dynamically adjust worker resources depending on how many workers are idle. In the example you provided, once 20 workers are initiated, if only 5 workers are used, then they will not be allocated the resources from the remaining 15 workers that are idle.

    If that's acceptable (e.g. because the idle resources are being utilized by an external program), then one way to restrict work to 5 workers is to explicitly specify them via workers kwarg to .map call:

    # instantiate workers
    from distributed import Client
    c = Client(n_workers=20)
    
    # select at most 5 workers from the available list
    selected_workers = list(c.scheduler_info()['workers'])[:5]
    
    dummy_function = lambda x: x**2
    futs = c.map(dummy_function, range(10), workers=selected_workers)
    

    Another way to control workload allocation is to use resources kwarg, see these related answers: 0, 1, 2, 3.