pythondaskdask-distributed

Dask futures using SSHCluster client not parallelizing


I'm new to Dask, and I'm currently trying to make a simple example using futures, but I can't seem to make it work. My goal is to get the host name of all my nodes (I have a cluster of 3 nodes, 2 who are workers, and one scheduler) , using futures. To do that, I created a function that sleeps 2 seconds, and then gets the hostname. However, when I'm launching the function a high number of times, I seem to get only the hostname of one node, instead of 2. Here is my code, and output:

#!/usr/bin/python3
from dask.distributed import Client, SSHCluster
from time import sleep
import dask
from dask import delayed
import socket
import time


cluster=SSHCluster(["10.20.3.1","c002-interconnect-1","c003-interconnect-1"],connect_options={"known_hosts":None},worker_options={},scheduler_options={"port":0,"dashboard_address":":8788"})
client=Client(cluster)
client.cluster.scale(2)

def gethost():
    sleep(2)
    return socket.gethostname()



futures=[]
workers=(client.scheduler_info()['workers'])
print("workers details")


start=time.time()

for i in workers.keys():
    l=(workers[i]['nthreads'])
    l2=(workers[i]['metrics']['cpu'])
    print("worker :"+i)
    print('number of cpu: ')
    print(l2)
    print('number of threads:')
    print(l)
    print("##########")
    for j in range(int((l*(l2+10)))):
        future=client.submit(gethost)
        futures.append(future)

results=[future.result() for future in futures]

end=time.time()
print("results:")
print(results)
print("time: ")
print(end-start)

and here is the output I get:


distributed.deploy.ssh - INFO - distributed.scheduler - INFO - -----------------------------------------------
distributed.deploy.ssh - INFO - distributed.http.proxy - INFO - To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
distributed.deploy.ssh - INFO - /usr/local/lib/python3.6/site-packages/distributed/node.py:155: UserWarning: Port 8788 is already in use.
distributed.deploy.ssh - INFO - Perhaps you already have a cluster running?
distributed.deploy.ssh - INFO - Hosting the HTTP server on port 39850 instead
distributed.deploy.ssh - INFO - http_address["port"], self.http_server.port
distributed.deploy.ssh - INFO - distributed.scheduler - INFO - -----------------------------------------------
distributed.deploy.ssh - INFO - distributed.scheduler - INFO - Clear task state
distributed.deploy.ssh - INFO - distributed.scheduler - INFO -   Scheduler at:     tcp://10.20.3.1:41283
distributed.deploy.ssh - INFO - distributed.nanny - INFO -         Start Nanny at: 'tcp://10.20.3.3:46321'
distributed.deploy.ssh - INFO - distributed.nanny - INFO -         Start Nanny at: 'tcp://10.20.3.2:32907'
distributed.deploy.ssh - INFO - distributed.worker - INFO -       Start worker at:      tcp://10.20.3.3:35165
distributed.deploy.ssh - INFO - distributed.worker - INFO -       Start worker at:      tcp://10.20.3.2:33232
workers details
worker :tcp://10.20.3.2:33232
number of cpu:
0.0
number of threads:
12
##########
worker :tcp://10.20.3.3:35165
number of cpu:
0.0
number of threads:
12
##########
results:
['c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002']
time:
3.2259795665740967

If you want more detail please let me know. This is also my first stack overflow question, so let me know if I did something wrong !


Solution

  • It might be necessary to import socket on the workers:

    def gethost():
        sleep(2)
        import socket
        return socket.gethostname()
    

    Also, the client.submit part should specify which worker is to execute the function:

    for i in workers.keys():
       # code skipped
       future = client.submit(gethost, pure=False, workers=[i])
    

    I cannot try at the moment, but hopefully this helps.