I'm not very experienced with multiprocessing but today I tried making a small http+api server when I noticed I have to press the login button multiple times to get into my account, I tried to recreate the problem without http.server but I couldn't so here is a minimal version of my code. The skipped items are adding up in the queue but get() ignores them, I tried using it with block=False but I get the same result.
from multiprocessing import Queue
import time
from uuid import uuid4
import threading
import requests
from http.server import SimpleHTTPRequestHandler
import socketserver
#Simulate request
def test():
while input()!='q':
try:requests.get('http://127.0.0.1:55467')
except:pass
class database:
def __init__(self):
self.queue=Queue()
self.elements={}
threading.Thread(target=self.db_updater,daemon=True).start()
#The function called in api calls
def create(self,name,data):self.queue.put([name,data])
#The loop that is supposed to update the database
def db_updater(self):
def create_element(element_name,data):self.elements[element_name]=data
while True:
change=self.queue.get()
print('update received',change,'\nqueue length',self.queue.qsize())
create_element(*change)
if __name__ == '__main__':
db=database()
threading.Thread(target=test,daemon=True).start()
class MyHttpRequestHandler(SimpleHTTPRequestHandler):
def log_message(*args):pass
def do_POST(self):pass
def do_GET(self):
print('request received')
db.create('testuser',{'token':str(uuid4()),'expire':int(time.time())+7200})
self.send_response(200)
self.end_headers()
with socketserver.ForkingTCPServer(("127.0.0.1",55467),MyHttpRequestHandler) as httpd:httpd.serve_forever()
The output looks like this:
request received
update received ['testuser', {'token': '86520f1e-524f-4587-8e4f-b826a0b14012', 'expire': 1712054294}]
queue length 1
request received
request received
update received ['testuser', {'token': 'a1c2a5e6-9390-4f31-a25a-df5e36f184ee', 'expire': 1712054295}]
queue length 2
request received
update received ['testuser', {'token': '08eb6fb4-5f4e-4df3-af48-28e854249c3e', 'expire': 1712054295}]
queue length 2
request received
update received ['testuser', {'token': 'dc3128ac-98d1-409b-9ca3-8c32f311b180', 'expire': 1712054295}]
queue length 2
request received
update received ['testuser', {'token': '55cc39a9-74cd-4e8c-b1fd-8db11e0b07c1', 'expire': 1712054296}]
queue length 2
request received
request received
update received ['testuser', {'token': 'a9779e1c-4a8e-45ee-8223-6022e94f67be', 'expire': 1712054296}]
queue length 3
request received
update received ['testuser', {'token': '9f8e2ab4-3c2e-4c90-ae94-be0e946e1ca6', 'expire': 1712054296}]
queue length 3
request received
request received
update received ['testuser', {'token': '6a0835c5-8568-4e53-bb9c-ec8ac9f134c0', 'expire': 1712054297}]
queue length 4
You just found a bug in Cpython ...., the continous forking corrupts the mutex on the queue, because python has to unlock all mutexes after forking ..... can you use Manager.Queue instead ? It is slower but doesn't have this issue.
from multiprocessing import Manager
...
self.manager = Manager()
self.queue = self.manager.Queue()
Best alternative if possible is to put the consumer database
in its own multiprocessing.Process
(not thread), this protects it from the whole mutex corruption issue.
otherwise you need to avoid using the ForkingServer
, or use another alternative to multiprocessing.Queue
like message brokers.
Last alternative is to implement your own queue which doesn't have a mutex on the reader side as only 1 reader exists.