pythongeventgreenlets

Gevent: how to wait for set of greenlets finished


I have a gevent.pool (with fixed size) shared between several task producers. Every task producer can apply a new greenlet to the pool if there are free slots. After tasks were added to the pool the task producer should wait until all added tasks are finished.

I tried to use gevent.queue.JoinableQueue to wait until all tasks are done. It works except I get a very annoying exception at the end of waiting.

How can I fix the code below to avoid that? Maybe I do something wrong?

from gevent import monkey, sleep; monkey.patch_all()
from gevent.queue import JoinableQueue
from gevent.pool import Pool

pool = Pool(3)


def worker(n):
    print 'Worker {} started'.format(n)
    sleep(1)
    print 'Worker {} finished'.format(n)
    return n


def main():
    results = []

    queue = JoinableQueue()
    for job_no in range(5):
        pool.wait_available()
        greenlet = pool.apply_async(worker, kwds=dict(n=job_no), callback=lambda ret: results.append(ret))
        queue.put(greenlet)
        sleep(.05)
    print 'All workers added'

    queue.join()
    print 'All workers finished', results


if __name__ == '__main__':
    main()

Output:

Worker 0 started
Worker 1 started
Worker 2 started
Worker 0 finished
Worker 3 started
Worker 1 finished
Worker 4 started
All workers added
Worker 2 finished
Worker 3 finished
Worker 4 finished
Traceback (most recent call last):
  File "main.py", line 32, in <module>
    main()
  File "main.py", line 27, in main
    queue.join()
  File "C:\Python\2.7.10\x64\lib\site-packages\gevent\queue.py", line 492, in join
    return self._cond.wait(timeout=timeout)
  File "C:\Python\2.7.10\x64\lib\site-packages\gevent\event.py", line 219, in wait
    return self._wait(timeout)
  File "C:\Python\2.7.10\x64\lib\site-packages\gevent\event.py", line 129, in _wait
    gotit = self._wait_core(timeout)
  File "C:\Python\2.7.10\x64\lib\site-packages\gevent\event.py", line 106, in _wait_core
    result = self.hub.switch()
  File "C:\Python\2.7.10\x64\lib\site-packages\gevent\hub.py", line 630, in switch
    return RawGreenlet.switch(self)
gevent.hub.LoopExit: ('This operation would block forever', <Hub at 0x26c1c28 select default pending=0 ref=0>)

Solution

  • You get 'This operation would block forever' error because there is no greenlet to consume tasks in the queue, queue.join() just blocks until all greenlets have finished, then the exception raised.

    JoinableQueue is not needed here, use gevent.joinall() to wait for all the greenlets to finish:

    import gevent
    
    def main():
        results = []
        gs = []
        for job_no in range(5):
            greenlet = ..
            gs.append(greenlet)
        gevent.joinall(gs)
        print 'All workers finished', results