I am trying to use pathos for triggering multiprocessing within a function. I notice, however, an odd behaviour and don't know why:
import spacy
from pathos.multiprocessing import ProcessPool as Pool
nlp = spacy.load("es_core_news_sm")
def preworker(text, nlp):
return [w.lemma_ for w in nlp(text)]
worker = lambda text: preworker(text, nlp)
texts = ["Este es un texto muy interesante en español"] * 10
# Run this in jupyter:
%%time
pool = Pool(3)
r = pool.map(worker, texts)
The output is
CPU times: user 6.6 ms, sys: 26.5 ms, total: 33.1 ms
Wall time: 141 ms
So far so good... Now I define the same exact calculation, but from a function:
def out_worker(texts, nlp):
worker = lambda text: preworker(text, nlp)
pool = Pool(3)
return pool.map(worker, texts)
# Run this in jupyter:
%%time
r = out_worker(texts, nlp)
The output now is
CPU times: user 10.2 s, sys: 591 ms, total: 10.8 s
Wall time: 13.4 s
Why is there such a large difference? My hypothesis, though I don't know why, is that in the second case a copy of the nlp object is sent to every single job.
Also, how can I correctly call this multiprocessing from within a function?
Thanks
EDIT:
For reproducibility of the issue, here is a Python script that shows the situation:
import spacy
from pathos.multiprocessing import ProcessPool as Pool
import time
# Install with python -m spacy download es_core_news_sm
nlp = spacy.load("es_core_news_sm")
def preworker(text, nlp):
return [w.lemma_ for w in nlp(text)]
worker = lambda text: preworker(text, nlp)
texts = ["Este es un texto muy interesante en español"] * 10
st = time.time()
pool = Pool(3)
r = pool.map(worker, texts)
print(f"Usual pool took {time.time()-st:.3f} seconds")
def out_worker(texts, nlp):
worker = lambda text: preworker(text, nlp)
pool = Pool(3)
return pool.map(worker, texts)
st = time.time()
r = out_worker(texts, nlp)
print(f"Pool within a function took {time.time()-st:.3f} seconds")
def out_worker2(texts, nlp, pool):
worker = lambda text: preworker(text, nlp)
return pool.map(worker, texts)
st = time.time()
pool = Pool(3)
r = out_worker2(texts, nlp, pool)
print(f"Pool passed to a function took {time.time()-st:.3f} seconds")
In my case, the output is this one:
Usual pool took 0.219 seconds
Pool within a function took 8.164 seconds
Pool passed to a function took 8.265 seconds
The spacy nlp object is quite heavy (a few MBs). My spacy version is 3.0.3
Instead of
from pathos.multiprocessing import ProcessPool as Pool
, I used
from multiprocess import Pool
, which is essentially the same thing. Then I tried some alternative approaches.
So:
from multiprocess import Pool
yields 0.1s
for the "usual" case and 12.5s
for the other two cases.
However:
from multiprocess import Pool
import dill
dill.settings['recurse'] = True
yields 12.5s
for all three cases.
Lastly:
from multiprocess.dummy import Pool
yields 0.1s
for all three cases.
What this tells me is that it's definitely a serialization issue, and that it's the serialization of globals
that is the key to the speed.
In the first case, the default dill
behavior is to try to avoid recursing through globals
if possible. It is able to do this successfully for the "usual" way, but not for the other two calls inside a function.
When I first import dill
and switch the behavior of globals to recurse
(this is how cloudpickle
does it's pickling), then it's slow in all three tries (the "usual" way included).
Lastly, if I use multiprocess.dummy
, and thus a ThreadPool
-- it doesn't need to serialize globals, and you can see it's fast in all cases.
Conclusion: use pathos.pools.ThreadPool
or multiprocess.dummy.Pool
if that's feasible. Otherwise, make sure that you are running in such a way that you aren't serializing globals.
There is a helpful tool in dill
that you can use to see what's getting serialized. If you include dill.detect.trace(True)
, then dill spits out a bunch of codes for the objects it is serializing as it recursively pickles the object and it's dependencies. You have to look at the dill
source code to match the key (e.g. F1
is a particular type of function object, and D1
is a particular type of dictionary). You can see how the different approaches serialize different underlying objects. I unfortunately don't have a profiler on it, so you can't see immediately where the speed hit is, but you can see the different strategies it takes.
I'd just try avoid serializing the nlp
object, or whatever it is that is causing the slowdown (probably the nlp
object).
So for example, instead of passing the nlp object in the function, you can do this instead:
import spacy
from multiprocess import Pool
import time
# Install with python -m spacy download es_core_news_sm
nlp = spacy.load("es_core_news_sm")
def preworker(text, nlp):
return [w.lemma_ for w in nlp(text)]
worker = lambda text: preworker(text, nlp)
texts = ["Este es un texto muy interesante en espanol"] * 10
st = time.time()
pool = Pool(3)
r = pool.map(worker, texts)
pool.close(); pool.join()
print("Usual pool took {0:.3f} seconds".format(time.time()-st))
def out_worker(texts):
worker = lambda text: preworker(text, nlp)
pool = Pool(3)
res = pool.map(worker, texts)
pool.close(); pool.join()
return res
st = time.time()
r = out_worker(texts)
print("Pool within a function took {0:.3f} seconds".format(time.time()-st))
By passing nlp
by reference lookup instead of explicitly through the function arguments, the speed was 0.1s
for both cases.