Sorry if this is a stupid question, but I'm having trouble understanding how managers work in python.
Let's say I have a manager that contains a dictionary to be shared across all processes. I want to have just one process writing to the dictionary at a time, while many others read from the dictionary.
https://docs.python.org/3/library/multiprocessing.html#sharing-state-between-processes
It depends on how you write to the dictionary, i.e. whether the operation is atomic or not:
my_dict[some_key] = 9 # this is atomic
my_dict[some_key] += 1 # this is not atomic
So creating a new key and updating a an existing key as in the first line of code above are atomic operations. But the second line of code are really multiple operations equivalent to:
temp = my_dict[some_key]
temp = temp + 1
my_dict[some_key] = temp
So if two processes were executing my_dict[some_key] += 1
in parallel, they could be reading the same value of temp = my_dict[some_key]
and incrementing temp
to the same new value and the net effect would be that the dictionary value only gets incremented once. This can be demonstrated as follows:
from multiprocessing import Pool, Manager, Lock
def init_pool(the_lock):
global lock
lock = the_lock
def worker1(d):
for _ in range(1000):
with lock:
d['x'] += 1
def worker2(d):
for _ in range(1000):
d['y'] += 1
if __name__ == '__main__':
lock = Lock()
with Manager() as manager, \
Pool(4, initializer=init_pool, initargs=(lock,)) as pool:
d = manager.dict()
d['x'] = 0
d['y'] = 0
# worker1 will serialize with a lock
pool.apply_async(worker1, args=(d,))
pool.apply_async(worker1, args=(d,))
# worker2 will not serialize with a lock:
pool.apply_async(worker2, args=(d,))
pool.apply_async(worker2, args=(d,))
# wait for the 4 tasks to complete:
pool.close()
pool.join()
print(d)
Prints:
{'x': 2000, 'y': 1162}
Update
As far as serialization, goes:
The BaseManager
creates a server using by default a socket for Linux and a named pipe for Windows. So essentially every method you execute against a managed dictionary, for example, is pretty much like a remote method call implemented with message passing. This also means that the server could also be running on a different computer altogether. But, these method calls are not serialized; the object methods themselves must be thread-safe because each method call is run in a new thread.
The following is an example of creating our own managed type and having the server listening for requests possibly from a different computer (although in this example, the client is running on the same computer). The client is calling increment
on the managed object 1000 times across two threads, but the method implementation is not done under a lock and so the resulting value of self.x
when we are all done is not 1000. Also, when we retrieve the value of x
twice concurrently by method get_x
we see that both invocations start up more-or-less at the same time:
from multiprocessing.managers import BaseManager
from multiprocessing.pool import ThreadPool
from threading import Event, Thread, get_ident
import time
class MathManager(BaseManager):
pass
class MathClass:
def __init__(self, x=0):
self.x = x
def increment(self, y):
temp = self.x
time.sleep(.01)
self.x = temp + 1
def get_x(self):
print(f'get_x started by thread {get_ident()}', time.time())
time.sleep(2)
return self.x
def set_x(self, value):
self.x = value
def server(event1, event2):
MathManager.register('Math', MathClass)
manager = MathManager(address=('localhost', 5000), authkey=b'abracadabra')
manager.start()
event1.set() # show we are started
print('Math server running; waiting for shutdown...')
event2.wait() # wait for shutdown
print("Math server shutting down.")
manager.shutdown()
def client():
MathManager.register('Math')
manager = MathManager(address=('localhost', 5000), authkey=b'abracadabra')
manager.connect()
math = manager.Math()
pool = ThreadPool(2)
pool.map(math.increment, [1] * 1000)
results = [pool.apply_async(math.get_x) for _ in range(2)]
for result in results:
print(result.get())
def main():
event1 = Event()
event2 = Event()
t = Thread(target=server, args=(event1, event2))
t.start()
event1.wait() # server started
client() # now we can run client
event2.set()
t.join()
# Required for Windows:
if __name__ == '__main__':
main()
Prints:
Math server running; waiting for shutdown...
get_x started by thread 43052 1629375415.2502146
get_x started by thread 71260 1629375415.2502146
502
502
Math server shutting down.