To get a better understanding about parallel, I am comparing a set of different pieces of code.
Here is the basic one (code_piece_1).
import time
# setup
problem_size = 1e7
items = range(9)
# serial
def counter(num=0):
junk = 0
for i in range(int(problem_size)):
junk += 1
junk -= 1
return num
def sum_list(args):
print("sum_list fn:", args)
return sum(args)
start = time.time()
summed = sum_list([counter(i) for i in items])
print(summed)
print('for loop {}s'.format(time.time() - start))
This code ran a time consumer in a serial style (for loop) and got this result
sum_list fn: [0, 1, 2, 3, 4, 5, 6, 7, 8]
36
for loop 8.7735116481781s
Could multiprocessing style be viewed as a way to implement parallel computing?
I assume a Yes, since the doc says so.
Here is code_piece_2
import multiprocessing
start = time.time()
pool = multiprocessing.Pool(len(items))
num_to_sum = pool.map(counter, items)
print(sum_list(num_to_sum))
print('pool.map {}s'.format(time.time() - start))
This code ran the same time consumer in multiprocessing style and got this result
sum_list fn: [0, 1, 2, 3, 4, 5, 6, 7, 8]
36
pool.map 1.6011056900024414s
Obviously, the multiprocessing one is faster than the serial in this particular case.
Dask is a flexible library for parallel computing in Python.
This code (code_piece_3) ran the same time consumer with Dask (I am not sure whether I use Dask the right way.)
@delayed
def counter(num=0):
junk = 0
for i in range(int(problem_size)):
junk += 1
junk -= 1
return num
@delayed
def sum_list(args):
print("sum_list fn:", args)
return sum(args)
start = time.time()
summed = sum_list([counter(i) for i in items])
print(summed.compute())
print('dask delayed {}s'.format(time.time() - start))
I got
sum_list fn: [0, 1, 2, 3, 4, 5, 6, 7, 8]
36
dask delayed 10.288054704666138s
my cpu has 6 physical cores
Why does Dask perform so slower while multiprocessing perform so much faster?
Am I using Dask the wrong way? If yes, what is the right way?
Note: Please discuss with this particular case or other specific and concrete cases. Please do NOT talk generally.
In your example, dask is slower than python multiprocessing, because you don't specify the scheduler, so dask uses the multithreading backend, which is the default. As mdurant has pointed out, your code does not release the GIL, therefore multithreading cannot execute the task graph in parallel.
Have a look here for a good overview over the topic: https://docs.dask.org/en/stable/scheduler-overview.html
For your code, you could switch to the multiprocessing backend by calling:
.compute(scheduler='processes')
.
If you use the multiprocessing backend, all communication between processes still needs to pass through the main process. You therefore might also want to check out the distributed scheduler, where worker processes can directly communicate with each other, which is beneficial especially for complex task graphs. Also, the distributed scheduler supports work-stealing to balance work between processes and has a webinterface providing some diagnostic information about running tasks. It often makes sense to use the distributed scheduler rather than the multirpocessing scheduler even if you only want to compute on a local machine.