Hi I'm observing a strange behaviour with python multiprocessing Queue object.
My environment:
OS: Windows 10
python: 3.13.1
but I observed the same with:
OS: Windows 10
python: 3.12.7
and:
OS: Windows 10
python: 3.10.14
while I could not reproduce it in Linux Redhat.
I have this short script:
from multiprocessing import Queue
a = list(range(716))
queue: Queue = Queue()
for item in a:
queue.put(item)
raise ValueError(f"my len: {len(a)}")
If I run it, everything is ok, it raises the error and exits:
Traceback (most recent call last):
File "C:\Users\uXXXXXX\AppData\Roaming\JetBrains\PyCharmCE2024.3\scratches\scratch_1.py", line 7, in <module>
raise ValueError(f"my len: {len(a)}")
ValueError: my len: 716
Process finished with exit code 1
but if i change the number from 716 to 717 or any other number above it, it raises the error but doesn't exit, the script hangs there. and when I forcefully stop the script it exits with code -1
Traceback (most recent call last):
File "C:\Users\uXXXXXX\AppData\Roaming\JetBrains\PyCharmCE2024.3\scratches\scratch_1.py", line 7, in <module>
raise ValueError(f"my len: {len(a)}")
ValueError: my len: 717
Process finished with exit code -1
Can you please help me solve this strange behaviour? i would like it to always automatically exit with code 1
when you put an item in the queue, you are only putting it in an internal list, and a worker thread is incrementally adding items to the IPC pipe, the process won't exit until the worker empties this list into the pipe, the pipe has a small internal buffer, which is why small sizes work fine, this is done so that queue.put
is not blocking, and also makes the queue faster.
you need to make sure all items in the queue are consumed for the processes to exit, you should consume all the remaining items in the queue from the main process, you can do that in a worker thread while joining the workers.
try:
while True:
item = queue.get_nowait()
# process item
except Empty:
pass
if there is no one going to read the queue and you just want to throw away the data in it then use queue.cancel_join_thread, which allows the process to exit without the list being drained, potentially leaving the queue in a broken state. only the main process should ever call it.
from multiprocessing import Queue
a = list(range(2000))
queue: Queue = Queue()
for item in a:
queue.put(item)
queue.cancel_join_thread()
raise ValueError(f"my len: {len(a)}")
ValueError: my len: 2000
Process finished with exit code 1