So I have a Queue :
q = Queue.Queue()
And I'm putting some items in it.
items = ["First", "Second"]
for val in items:
q.put(val)
And I'm spawning 15 threads.
for i in range(15):
tname = 't-%s' % i
t = my_thread(some_func, q, tname)
t.start()
q.join()
The my_thread class looks something below:
class my_thread(threading.Thread):
def __init__(self, some_func, q_, name=''):
threading.Thread.__init__(self)
self.func = some_func
self.process_q = q_
self.name = name
self.prefix = name
def run(self):
stime = time.time()
logging.info('%s thread staring at : %s' % (threading.currentThread().getname(), time.ctime(stime)))
while True:
if self.process_q.empty():
break
queue_item = self.process_q.get()
self.name = self.prefix + '-' + queue_item
try:
#run the function
except Exception as e:
logging.error('Caught some error')
finally:
self.process_q.task_done()
endTime = time.time()
logging.info('%s thread finished at: %s' % (threading.currentThread().getName(), time.ctime(endTime)))
If i have a look at the logs, what I see is that two or more threads access the Queue at the same time and the while loop doesn't break when the Queue is empty.
Let's say t-0
thread has taken the "first"
item from the queue.
But the t-2
thread might take the "second"
item before t-1
thread can take it thus making the queue empty...but when t-1
did the self.process_q.empty()
check, the queue wasn't empty. So t-1
thread never exits/finishes and is left hanging.
If I do an strace on the process id I get the below:
Process 13307 attached
futex(0x2a5fcc0, FUTEX_WAIT_PRIVATE, 0, NULL
How do I solve this???
Your threads randomly hanging in the blocking self.process_q.get() function. -> racecondition
At the moment, the threads are started, the queue is not empty. The code part ...
...
if self.process_q.empty():
break
queue_item = self.process_q.get()
...
is not synchronized over all threads. So may more than 2 threads (queue size = 2) pass the if condition. Two threads are getting results from the self.process_q.get() function while others are blocking and waiting for results from the queue.
The python programm can not exit until all non-daemon threads are finished. So it hangs forever.
Consider to set the threads in daemon mode:
for i in range(15):
tname = 't-%s' % i
t = my_thread(some_func, q, tname)
t.setDaemon(True)
t.start()
from https://docs.python.org/2/library/threading.html#threading.Thread.daemon :
daemon
A boolean value indicating whether this thread is a daemon thread (True) or not (False). This must be set before start() is called, otherwise RuntimeError is raised. Its initial value is inherited from the creating thread; the main thread is not a daemon thread and therefore all threads created in the main thread default to daemon = False.
The entire Python program exits when no alive non-daemon threads are left.
By setting daemon-mode to true, the programm exits after the queue is empty (q.join()).