I'm new to Dask, and I'm currently trying to make a simple example using futures, but I can't seem to make it work. My goal is to get the host name of all my nodes (I have a cluster of 3 nodes, 2 who are workers, and one scheduler) , using futures. To do that, I created a function that sleeps 2 seconds, and then gets the hostname. However, when I'm launching the function a high number of times, I seem to get only the hostname of one node, instead of 2. Here is my code, and output:
#!/usr/bin/python3
from dask.distributed import Client, SSHCluster
from time import sleep
import dask
from dask import delayed
import socket
import time
cluster=SSHCluster(["10.20.3.1","c002-interconnect-1","c003-interconnect-1"],connect_options={"known_hosts":None},worker_options={},scheduler_options={"port":0,"dashboard_address":":8788"})
client=Client(cluster)
client.cluster.scale(2)
def gethost():
sleep(2)
return socket.gethostname()
futures=[]
workers=(client.scheduler_info()['workers'])
print("workers details")
start=time.time()
for i in workers.keys():
l=(workers[i]['nthreads'])
l2=(workers[i]['metrics']['cpu'])
print("worker :"+i)
print('number of cpu: ')
print(l2)
print('number of threads:')
print(l)
print("##########")
for j in range(int((l*(l2+10)))):
future=client.submit(gethost)
futures.append(future)
results=[future.result() for future in futures]
end=time.time()
print("results:")
print(results)
print("time: ")
print(end-start)
and here is the output I get:
distributed.deploy.ssh - INFO - distributed.scheduler - INFO - -----------------------------------------------
distributed.deploy.ssh - INFO - distributed.http.proxy - INFO - To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
distributed.deploy.ssh - INFO - /usr/local/lib/python3.6/site-packages/distributed/node.py:155: UserWarning: Port 8788 is already in use.
distributed.deploy.ssh - INFO - Perhaps you already have a cluster running?
distributed.deploy.ssh - INFO - Hosting the HTTP server on port 39850 instead
distributed.deploy.ssh - INFO - http_address["port"], self.http_server.port
distributed.deploy.ssh - INFO - distributed.scheduler - INFO - -----------------------------------------------
distributed.deploy.ssh - INFO - distributed.scheduler - INFO - Clear task state
distributed.deploy.ssh - INFO - distributed.scheduler - INFO - Scheduler at: tcp://10.20.3.1:41283
distributed.deploy.ssh - INFO - distributed.nanny - INFO - Start Nanny at: 'tcp://10.20.3.3:46321'
distributed.deploy.ssh - INFO - distributed.nanny - INFO - Start Nanny at: 'tcp://10.20.3.2:32907'
distributed.deploy.ssh - INFO - distributed.worker - INFO - Start worker at: tcp://10.20.3.3:35165
distributed.deploy.ssh - INFO - distributed.worker - INFO - Start worker at: tcp://10.20.3.2:33232
workers details
worker :tcp://10.20.3.2:33232
number of cpu:
0.0
number of threads:
12
##########
worker :tcp://10.20.3.3:35165
number of cpu:
0.0
number of threads:
12
##########
results:
['c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002', 'c002']
time:
3.2259795665740967
If you want more detail please let me know. This is also my first stack overflow question, so let me know if I did something wrong !
It might be necessary to import socket
on the workers:
def gethost():
sleep(2)
import socket
return socket.gethostname()
Also, the client.submit
part should specify which worker is to execute the function:
for i in workers.keys():
# code skipped
future = client.submit(gethost, pure=False, workers=[i])
I cannot try at the moment, but hopefully this helps.