pythonmultithreadingqueuefutex

Python: Process hangs with futex(0x2a5fcc0, FUTEX_WAIT_PRIVATE, 0, NULL in multithreading


So I have a Queue :

q = Queue.Queue()

And I'm putting some items in it.

items = ["First", "Second"]
for val in items:
    q.put(val)

And I'm spawning 15 threads.

for i in range(15):
   tname = 't-%s' % i
   t = my_thread(some_func, q, tname)
   t.start()

q.join()

The my_thread class looks something below:

class my_thread(threading.Thread):
    def __init__(self, some_func, q_, name=''):
       threading.Thread.__init__(self)
       self.func = some_func
       self.process_q = q_
       self.name = name
       self.prefix = name


    def run(self):
       stime = time.time()
       logging.info('%s thread staring at : %s' % (threading.currentThread().getname(), time.ctime(stime)))
       while True:
           if self.process_q.empty():
               break
           queue_item = self.process_q.get()
           self.name = self.prefix + '-' + queue_item
           try:
               #run the function
           except Exception as e:
               logging.error('Caught some error')
           finally:
               self.process_q.task_done()


       endTime = time.time()
       logging.info('%s thread finished at: %s' % (threading.currentThread().getName(), time.ctime(endTime)))

If i have a look at the logs, what I see is that two or more threads access the Queue at the same time and the while loop doesn't break when the Queue is empty.

Let's say t-0 thread has taken the "first" item from the queue. But the t-2 thread might take the "second" item before t-1 thread can take it thus making the queue empty...but when t-1 did the self.process_q.empty() check, the queue wasn't empty. So t-1 thread never exits/finishes and is left hanging.

If I do an strace on the process id I get the below:

Process 13307 attached
futex(0x2a5fcc0, FUTEX_WAIT_PRIVATE, 0, NULL

How do I solve this???


Solution

  • Your threads randomly hanging in the blocking self.process_q.get() function. -> racecondition

    At the moment, the threads are started, the queue is not empty. The code part ...

    ...
    if self.process_q.empty():
        break
    queue_item = self.process_q.get()
    ...
    

    is not synchronized over all threads. So may more than 2 threads (queue size = 2) pass the if condition. Two threads are getting results from the self.process_q.get() function while others are blocking and waiting for results from the queue.

    The python programm can not exit until all non-daemon threads are finished. So it hangs forever.

    Consider to set the threads in daemon mode:

    for i in range(15):
        tname = 't-%s' % i
        t = my_thread(some_func, q, tname)
        t.setDaemon(True)
        t.start()
    

    from https://docs.python.org/2/library/threading.html#threading.Thread.daemon :

    daemon

    A boolean value indicating whether this thread is a daemon thread (True) or not (False). This must be set before start() is called, otherwise RuntimeError is raised. Its initial value is inherited from the creating thread; the main thread is not a daemon thread and therefore all threads created in the main thread default to daemon = False.

    The entire Python program exits when no alive non-daemon threads are left.

    By setting daemon-mode to true, the programm exits after the queue is empty (q.join()).