I have a gevent.pool (with fixed size) shared between several task producers. Every task producer can apply a new greenlet to the pool if there are free slots. After tasks were added to the pool the task producer should wait until all added tasks are finished.
I tried to use gevent.queue.JoinableQueue to wait until all tasks are done. It works except I get a very annoying exception at the end of waiting.
How can I fix the code below to avoid that? Maybe I do something wrong?
from gevent import monkey, sleep; monkey.patch_all()
from gevent.queue import JoinableQueue
from gevent.pool import Pool
pool = Pool(3)
def worker(n):
print 'Worker {} started'.format(n)
sleep(1)
print 'Worker {} finished'.format(n)
return n
def main():
results = []
queue = JoinableQueue()
for job_no in range(5):
pool.wait_available()
greenlet = pool.apply_async(worker, kwds=dict(n=job_no), callback=lambda ret: results.append(ret))
queue.put(greenlet)
sleep(.05)
print 'All workers added'
queue.join()
print 'All workers finished', results
if __name__ == '__main__':
main()
Output:
Worker 0 started
Worker 1 started
Worker 2 started
Worker 0 finished
Worker 3 started
Worker 1 finished
Worker 4 started
All workers added
Worker 2 finished
Worker 3 finished
Worker 4 finished
Traceback (most recent call last):
File "main.py", line 32, in <module>
main()
File "main.py", line 27, in main
queue.join()
File "C:\Python\2.7.10\x64\lib\site-packages\gevent\queue.py", line 492, in join
return self._cond.wait(timeout=timeout)
File "C:\Python\2.7.10\x64\lib\site-packages\gevent\event.py", line 219, in wait
return self._wait(timeout)
File "C:\Python\2.7.10\x64\lib\site-packages\gevent\event.py", line 129, in _wait
gotit = self._wait_core(timeout)
File "C:\Python\2.7.10\x64\lib\site-packages\gevent\event.py", line 106, in _wait_core
result = self.hub.switch()
File "C:\Python\2.7.10\x64\lib\site-packages\gevent\hub.py", line 630, in switch
return RawGreenlet.switch(self)
gevent.hub.LoopExit: ('This operation would block forever', <Hub at 0x26c1c28 select default pending=0 ref=0>)
You get 'This operation would block forever' error because there is no greenlet to consume tasks in the queue, queue.join()
just blocks until all greenlets have finished, then the exception raised.
JoinableQueue
is not needed here, use gevent.joinall()
to wait for all the greenlets to finish:
import gevent
def main():
results = []
gs = []
for job_no in range(5):
greenlet = ..
gs.append(greenlet)
gevent.joinall(gs)
print 'All workers finished', results