I want to perform parallel processing of a class method. This method writes to a dict to save the data. Within threads it works, but once the method exits, the dict appears to be untouched. Here is a simplification of my problem:
from multiprocessing import Pool
class Test():
def __init__(self):
self.dict1 = {str(i): i for i in range(1, 10)}
def add_smt(self, arg):
key, indice = arg
self.dict1[key] += 1
print("key: {}, indice: {}, new_value: {}\n".format(key, indice, self.dict1[key]))
def add_smt_multithread(self):
with Pool() as pool:
for response in pool.imap(self.add_smt, zip(self.dict1.keys(), range(1, len(self.dict1.keys())))):
continue
a = Test()
a.add_smt_multithread()
In this simplification, I want to add +1 for each self.dict1 key using a multiprocessing method. I was surprised that after the a.add_smt_multithread, the a.dict1 remains untouched. I edited the self.add_smt method with a return and the self.add_smt_multithread into the following working code, but I want to understand the behavior of my first attempt and maybe an easier solution (if you know), as this one is clearer (in my opinion).
from multiprocessing import Pool
class Test():
def __init__(self):
self.dict1 = {str(i): i for i in range(1, 10)}
def add_smt(self, arg):
key, indice = arg
self.dict1[key] += 1
print("key: {}, indice: {}, new_value: {}\n".format(key, indice, self.dict1[key]))
return [key, self.dict1[key]]
def add_smt_multithread(self):
with Pool() as pool:
for response in pool.imap(self.add_smt, zip(self.dict1.keys(), range(1, len(self.dict1.keys())))):
self.dict1[response[0]] = response[1]
a = Test()
a.add_smt_multithread()
I tried to convert the add_smt into a static method but it wasn`t successfull. I want to call just one method of the object to start the multiprocessing. My original code contains a method of http requests and I want to paralize these.
Your primary issue is a misunderstanding. When you say "Within threads it works, but once the method exits, the dict appears to be untouched", you're making an incorrect statement: multiprocessing
workers are separate processes, not threads.
The important differences are:
Threads share a single global memory space (with a special case for thread locals), processes each have their own independent memory space. Processes duplicate (completely when using the 'fork'
start method, the default on most UNIX-like OSes, an imperfect simulation thereof on Windows and macOS [the latter when on Python 3.8+ only] where it defaults to the 'spawn'
start method) the parent process's state when they are first created, but after that, the memory spaces are independent unless specific IPC mechanisms are used to explicitly share state.
Threads are limited by the GIL (on the CPython reference interpreter, and IIRC, PyPy), a single shared global lock that allows only one thread to be in the bytecode interpreter loop, working with Python-level objects, at a time. For CPU-bound code not using low-level high-performance modules (mostly third party, e.g. numpy
), this means you can't extract any speed up from threads (in older versions of Python, you'd actually take a pretty nasty performance hit from GIL contention if you used a lot of threads; the new GIL introduced in CPython 3.2 reduces that problem substantially, but it still limits you to getting roughly one core's worth of performance out of threaded Python-level code. If your code is I/O bound, or it's doing large scale number-crunching (e.g. with 10Kx10K numpy
arrays or the like), or you're on a Python interpreter without the GIL (sadly, most such interpreters are wildly out of date, Jython support for Py3 remains not production ready according to their own docs, and IronPython is only 3.4 compatible, seven releases behind CPython), threading may help.
Processes are limited by IPC costs and limitations; with a Pool
, every function and its arguments must be pickled (Python's serialization mechanism), sent to the worker processes over a pipe, unpickled on the worker side, run, then the return value must be pickled, sent back to the parent, and unpickled on the parent side. If the objects in question can't be made to be picklable, processes don't work, and if the amount of work being done is too small, relative to the amount of data that must be serialized and transmitted over the pipe, they won't produce any savings. They're also more memory-hungry on CPython (where the cyclic garbage collector touching reference counts means that, unless you do careful things with gc.freeze
, even on a fork
based system most of the pages that are mapped into the child as copy-on-write will end up being written and start consuming real memory). This also means that any changes made to the function or arguments will not be seen in the parent process (because the function and arguments received on the worker process side are copies of the parent's state, not aliases).
#3 is your big problem here (#1 is a problem in many similar circumstances, but in this case, you're not relying on globals). When you call pool.imap(self.add_smt, zip(self.dict1.keys(), range(1, len(self.dict1.keys()))))
, it's repeatedly pickling self.add_smt
(along with each set of arguments for each task), and the pickled form of self.add_smt
is the pickled form of self
itself, plus the name add_smt
and a marker saying it needs to look up the latter on the former. Pickling is recursive, so this means you pickle self.dict1
each time, sending it along as part of the task, and a copy of it is realized in each worker. The worker's copy is up-to-date with the parent (assuming no races with threads modifying it), but it's independent; the only state sent back to the parent process is the method's return value, not the updated state of self.add_smt
/self
/self.dict1
.
Your proposed solution "works", but:
It only works because the counts for any given key are always updated exactly once in the imap
. If the same key was modified in two of the tasks imap
creates (fast enough that the first task hasn't returned and updated the parent process), both would see the same initial value for the key, both would increment once, and the parent would end up with only a single increment taking effect, not both.
It's extremely inefficient; self.add_smt
(and therefore self
and self.dict1
) are pickled, written to a pipe, and unpickled for every task (not once per imap
, once per call to call to self.add_smt
that it triggers). That's a lot of data being serialized and round-tripped for very little work done in the workers (the code is almost certainly slower as a result, and not by a small amount).
The real solution, for I/O-bound code where the GIL isn't a big issue (though you will need to be careful of data races), is usually to get rid of the separate processes and the data copies that come with it. You can change your import
(s) from using multiprocessing
(process-based) to multiprocessing.dummy
(a reimplementation of the multiprocessing
API backed by threads), and now the threads will all see/use the same shared data.
You do need to add locking to be correct. I suspect the specifics of CPython's GIL's implementation, and your use of string keys, means it probably works even without explicit locks, but you don't want to rely on that; even on CPython, the rules for when the GIL can transition have changed over time, so while self.dict1[key] += 1
might be effectively atomic in 3.11 (because GIL hand-offs don't currently occur during the opcodes for loading a value from a dict
, incrementing it, and storing it back), that could change in even a patch release (there's no documented rules for when the GIL can't be handed off, except that it must either happen between bytecodes, or due to a C extension explicitly releasing it). So your final code could look something like:
from multiprocessing.dummy import Pool, Lock # Use thread-based Pool/locks, not process-based
class Test():
def __init__(self):
self.dict1 = {str(i): i for i in range(1, 10)}
self._lock = Lock()
def add_smt(self, arg):
key, indice = arg
# Protect all access to dict1 behind a lock
with self._lock:
self.dict1[key] += 1
print("key: {}, indice: {}, new_value: {}\n".format(key, indice, self.dict1[key]))
# Alternate approach that avoids holding lock while printing:
with self._lock:
# Cache result of increment to avoid racy lookup in print
self.dict1[key] = val = self.dict1[key] + 1
print("key: {}, indice: {}, new_value: {}\n".format(key, indice, val)) # Use cached val
def add_smt_multithread(self):
with Pool() as pool:
# Minor tweak: Removed .keys() from all uses; dicts already iterate by key,
# so making a keys view just wastes time when the dict would iterate the same way
# I also changed it to use imap_unordered, as you don't care about result
# ordering, and it saves a lot of work if you don't make Python queue up results
# to preserve output ordering; imap is the slowest of the map-like calls,
# because of this work, while map and imap_unordered is extremely efficient,
# the former due to being able to chunk up work, the latter due to being the
# the lowest possible job tracking overhead
# Minor check: Should the range be from 1 to len(self.dict1)+1? As written,
# you don't map the final key, because the range is one element smaller than the dict
for response in pool.imap_unordered(self.add_smt,
zip(self.dict1, range(1, len(self.dict1)))):
pass # Minor tweak: continue implies skipping, pass means "do nothing at all"
# Lacking this import guard would break your original code even worse on Windows and macOS
# and including it even when processes aren't involved is a good safety measure
if __name__ == '__main__':
a = Test()
a.add_smt_multithread()