I have two daemon processes running. One feeds data into a shared multiprocessing.Queue, and the other takes data from this Queue and submits it into a multiprocessing.pool.apply_async. The async result is sent to another multiprocessing.Queue.
Some example code:
import multiprocessing
from multiprocessing import Process
import random
import time
def my_method(i):
return i*i
class DataFeeder:
input_queue = None
@staticmethod
def stream_to_queue(iq):
if DataFeeder.input_queue is None:
DataFeeder.input_queue = iq
while True:
time.sleep(1)
dat = random.choice(range(0, 50))
print(f"feeding {dat}")
DataFeeder.input_queue.put(dat)
class DataEater:
input_queue = None
results_queue = None
pool = None
@staticmethod
def eat(iq, rq, p):
if DataEater.input_queue is None:
DataEater.input_queue = iq
if DataEater.results_queue is None:
DataEater.results_queue = rq
if DataEater.pool is None:
DataEater.pool = p
while True:
time.sleep(0.1)
dat = DataEater.input_queue.get(0.1) # 100ms timeout
print(f"eating {dat}")
async_result = DataEater.pool.apply_async(my_method, (dat,))
print(f"async_result {async_result}")
DataEater.results_queue.put_nowait(async_result)
if __name__ == '__main__':
with multiprocessing.Manager() as m:
input_q = m.Queue()
output_q = m.Queue()
pool = m.Pool(8)
dfp = Process(target=DataFeeder.stream_to_queue, args=(input_q,), daemon=True)
dfp.start()
dep = Process(target=DataEater.eat, args=(input_q, output_q, pool), daemon=True)
dep.start()
dep.join()
dfp.join()
The resulting stacktrace is as follows:
Process Process-3:
Traceback (most recent call last):
File "/opt/anaconda3/envs/integration_tests/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
self.run()
File "/opt/anaconda3/envs/integration_tests/lib/python3.9/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "/Users/chhunb/PycharmProjects/micromanager_server/umanager_server/multiproctest.py", line 54, in eat
async_result = DataEater.pool.apply_async(my_method, (dat,))
File "<string>", line 2, in apply_async
File "/opt/anaconda3/envs/integration_tests/lib/python3.9/multiprocessing/managers.py", line 816, in _callmethod
proxytype = self._manager._registry[token.typeid][-1]
AttributeError: 'NoneType' object has no attribute '_registry'
I've tried spawning the pool locally in the eat()
method, so that it would spawn from within the new process. This won't work because the result is an async_result
and has to be shared with other processes using the results_queue
.
I've tried completely ignoring the DataFeeder by hard coding some string into the apply-async input. Same error.
This open python issue is very similar, if not exactly the same as what i'm doing: https://github.com/python/cpython/issues/80100 The issue is still open so does that mean it was never resolved in later versions of python?
Do i have to consider a completely different design pattern to get this functionality?
The following is a hackish approach which was tested only on Python 3.11 on Windows. It may break for other than the CPython implementation, other Python versions, other OS'
Moreover it currently transfers the authentication key of the manager's server between processes (this is considered unsafe) but this could be changed if necessary with a fixed key.
import multiprocessing
from multiprocessing import Process
import threading
import random
import time
def my_method(i):
return i*i
class DataFeeder:
input_queue = None
@staticmethod
def stream_to_queue(iq):
if DataFeeder.input_queue is None:
DataFeeder.input_queue = iq
while True:
time.sleep(1)
dat = random.choice(range(0, 50))
print(f"feeding {dat}")
DataFeeder.input_queue.put(dat)
class DataEater:
input_queue = None
results_queue = None
pool = None
@staticmethod
def eat(iq, rq, p, adr, auth, registry):
if DataEater.input_queue is None:
DataEater.input_queue = iq
if DataEater.results_queue is None:
DataEater.results_queue = rq
if DataEater.pool is None:
DataEater.pool = p
# Here begins the magic (aka dirty hack):
# Constructing a new manager connected to the same server process
# as the manager in main thread
from multiprocessing.managers import SyncManager
manager = SyncManager(address=adr, authkey=auth)
# Updating the registry:
registry.update(SyncManager._registry)
SyncManager._registry = registry
manager.connect()
# Setting our manager as manager of the pool proxy
DataEater.pool._manager = manager
# Continue normally
while True:
time.sleep(0.1)
dat = DataEater.input_queue.get(0.1) # 100ms timeout
print(f"eating {dat}")
async_result = DataEater.pool.apply_async(my_method, (dat,))
print(f"async_result {async_result}")
DataEater.results_queue.put_nowait(async_result)
# Additional function/process to wait for the asynchronous result and print it:
def data_printer(output_q):
while True:
item = output_q.get()
result = item.get()
print(f"awaited async result {result}")
if __name__ == '__main__':
with multiprocessing.Manager() as m:
input_q = m.Queue()
output_q = m.Queue()
pool = m.Pool(8)
dpp = Process(target=data_printer, args=(output_q,), daemon=True)
dpp.start()
dfp = Process(target=DataFeeder.stream_to_queue, args=(input_q,), daemon=True)
dfp.start()
# Three additional arguments added to create a remote manager in the process
dep = Process(target=DataEater.eat, args=(input_q, output_q, pool, m.address, m._authkey, m._registry), daemon=True)
dep.start()
dep.join()
dfp.join()
dpp.join()