pythonasynchronousgeneratorgeventeventlet

How do I feed an infinite generator to eventlet (or gevent)?


The docs of both eventlet and gevent have several examples on how to asyncronously spawn IO tasks and get the results latter. But so far, all the examples where a value should be returned from the async call,I allways find a blocking call after all the calls to spawn(). Either join(), joinall(), wait(), waitall(). This assumes that calling the functions that use IO is immediate and we can jump right into the point where we are waiting for the results.

But in my case I want to get the jobs from a generator that can be slow and or arbitrarily large or even infinite.

I obviously can't do this

pile = eventlet.GreenPile(pool)
for url in mybiggenerator():
    pile.spawn(fetch_title, url)
titles = '\n'.join(pile)

because mybiggenerator() can take a long time before it is exhausted. So I have to start consuming the results while I am still spawning async calls.

This is probably usually done with resource to queues, but I'm not really sure how. Say I create a queue to hold jobs, push a bunch of jobs from a greenlet called P and pop them from another greenlet C. When in C, if I find that the queue is empty, how do I know if P has pushed every job it had to push or if it is just in the middle of an iteration?

Alternativey,Eventlet allows me to loop through a pile to get the return values, but can I start doing this without having spawn all the jobs I have to spawn? How? This would be a simpler alternative.


Solution

  • You don't need any pool or pile by default. They're just convenient wrappers to implement a particular strategy. First you should get idea how exactly your code must work under all circumstances, that is: when and why you start another greenthread, when and why wait for something.

    When you have some answers to these questions and doubt in others, ask away. In the meanwhile, here's a prototype that processes infinite "generator" (actually a queue).

    queue = eventlet.queue.Queue(10000)
    wait = eventlet.semaphore.CappedSemaphore(1000)
    
    
    def fetch(url):
      # httplib2.Http().request
      # or requests.get
      # or urllib.urlopen
      # or whatever API you like
      return response
    
    
    def crawl(url):
      with wait:
        response = fetch(url)
        links = parse(response)
        for url in link:
          queue.put(url)
    
    
    def spawn_crawl_next():
      try:
        url = queue.get(block=False)
      except eventlet.queue.Empty:
        return False
      # use another CappedSemaphore here to limit number of outstanding connections
      eventlet.spawn(crawl, url)
      return True
    
    
    def crawler():
      while True:
        if spawn_crawl_next():
          continue
    
        while wait.balance != 0:
          eventlet.sleep(1)
    
        # if last spawned `crawl` enqueued more links -- process them
        if not spawn_crawl_next():
          break
    
    
    def main():
      queue.put('http://initial-url')
      crawler()