pythonmultithreadingconcurrencycross-domainrate-limiting

Rate limiting concurrent web queries to multiple domains, stdlib only


Question

How to manage rate limiting concurrent web queries to multiple domains using only python stdlib? Not asking about algorithms like leaky bucket or single-domain solutions, but how to approach data and code structures for concurrent queries to multiple domains.

Related

The following posts have useful info but don't solve my problem. They don't address concurrent requests and per-domain rate limits, and many answers use 3rd party modules.

This question comes closest, quite similar setup (though I don't revisit same links). But approach uses external database and PHP / Java, not python. Too heavy, need a python solution.

Setup

Downloading data from multiple domains. Typical pattern is:

That's simplified a bit. In reality more download items can be added at any time throughout the process, which is ongoing.

Can't / don't want to install 3rd party libs like from pypi, to minimize dependencies and maintain security. Code should be self-contained using only python stdlib.

Current Approach

I use a producer / consumer implementation with a single queue and several worker threads for concurrent downloads. It uses leaky bucket for rate limiting with timestamps (no discard, rate-limited items simply wait). Works perfectly for single-domain downloads. Conceptually like this:

def dl_worker (que) :
    while true :
        item = que.get ()
        if paused (item.domain) :   # waiting due to rate limit
            time.sleep (backofftime (item.domain))
        # dl item...

# start pool of dl threads
dlque = queue.Queue ()
workers = multiprocessing.pool.ThreadPool (5)
with workers :
    for x in workers._pool :
        workers.apply_async (dl_worker, dlque)

Problem: this uses a single queue for all download items. When dl threads pause to rate limit, any items in queue for second domain are stuck waiting during pause for first domain items.

Rejected solutions

def dl_worker () :
    while true :
        item = dlque.get ()
        if paused (item.domain) :   # waiting due to rate limit
            dlque.put (item)        # move item to back of queue
            continue

        # process item...

This is wasteful. If queue only contains items from domain1, then workers will continually shuffle items from front to back during backoff period. Want to avoid busy-wait solutions.

Other Solutions

The main problem with current solution is having a single queue. I came up with a few other approaches, but they all have drawbacks:

  1. Use separate queue for each domain. There doesn't seem to be a way to wait () on multiple queues in python (like select on file handles). I have to manually poll all queues to see which are ready. Something like (imagine suitable thread locks where needed):
allques = {}

def put (item):
    que = allques.setdefault (item.domain, queue.Queue ())
    que.put (item)

def dl_worker (que):
    while true :
        # find which queues have items ready to process
        active = [ x for x in allques if x not in paused ]
        ready  = [ a for a in active if me.allques [a].qsize () ]
        
        if ready :
            for domain in ready :
                try :
                    item = me.allques [domain].get (block = false)
                    # process item ...
                    break
                except Empty :
                    # qsize was wrong, no biggie, move on to next queue
                    pass
                            
        else :
            # wait and poll again
            time.sleep (0.5)

I dislike the sleep polling. I could use a threading.Semaphore instead to track the number of current items across all queues (call semaphore.release on every que.put and semaphore.acquire before attempting que.get). But that doesn't help if all queues are currently in rate-limit backoff: dl_worker will busy-wait performing the while loop, with ready being an empty list on each pass.

Using individual semaphores for each queue just creates the same polling problem.

This approach also doesn't preserve dl item order. Just grabs first item from first ready queue it finds. I could use random.shuffle on ready to at least randomize which queue is picked from. Preserving item order seems difficult. Would need a separate data structure to track insertion order across all queues. Seems like more trouble than it's worth.

  1. I could use two queues: active and paused. Items are popped from active queue. If domain is in backoff period, then stick item on paused queue until backoff expires. I think this has the same problem though. Namely, I need a dispatcher thread to watch paused queue and shuffle items back to active queue once their backoff period expires.

What happens when second item on paused queue expires sooner than first item? It will be stuck waiting on the paused queue until first item is removed and put back on active queue.

Or I need a non-queue data structure for paused, so I can pull off any item that's ready. But then I need sleep polling again (no blocking get call available).

  1. I could use a different data structure than a queue to filter out items currently in a backoff period. Not sure what structure this would be though. PriorityQueue doesn't seem to help.

One, its priorities are static. My priorities are dynamic: hitting rate limit triggers a pause, and pause may increase while items are waiting (eg two workers grab items from same domain at same time. first finishes and triggers a pause, second finishes later and lengthens the pause).

Two, PriorityQueue always returns an item if available. I can't set a backoff period to say "hold this item on the queue until time X, then return it".

  1. I could use a heap instead of a queue to retrieve any object. But it's hard to keep heap in sorted order. As downloads happen and rate limits are triggered, item priority changes dynamically. Resorting the heap every time a backoff happens seems inefficient. And python heaps don't block on get(), they either return or throw. Still need time.sleep polling.

Conclusion

So far option 1 seems like the best solution, despite its drawbacks. Does anyone have better ideas?

I'm not inherently tied to a threading model. Async might work as well. I hate littering my code with async / await junk and the red / blue function problem. But if there's a cleaner solution available, it's worth considering.


Solution

  • Scheduler

    Another potential solution just came to me: use a scheduler thread. Something like this:

    Worker threads wait() on itemsready flag then grab next item from an active queue. Otherwise workers sleep.

    This seems like a good solution. Not sure if scheduler needs one thread or two (one to sort incoming items, second to monitor active queues). Because one thread can't wait on both things simultaneously: incomingqueue.get() and backofftimer.wait(). Or maybe it can, if I compute time til next backoff expiration and use that as timeout on get().

    I'll work on code and post an update.

    Code

    This approach works beautifully. I encapsulated all per-domain queues in a single queue class that simplifies the algorithm. Consumers access one que object, which manages separate per-domain queues internally. Item ready flag can be updated in get / put calls, leaving watcher thread to just monitor backoff expiration.

    class Pauseque () :  # {
       
        # ---------------
    
        def __init__ (me, keyfunc) :
    
            me.keyfunc = keyfunc         # extract unique key (domain) from each item
            me.outques = {}              # outgoing queues sorted by domain
            me.paused  = {}              # paused queue keys with expiration time
    
            me.lock    = threading.RLock ()   # synchronize ops
            me.gotime  = threading.Event ()   # signal when items ready
            me.notime  = threading.Event ()   # signal when backoff period active
    
            # spawn new thread to watch queues for ready items
            watcher = threading.Thread (target = me._watcher , daemon = true)
            watcher.start ()
            
        # ---------------
        
        def _watcher (me) :
            'watch for queues to unpause after backoff'
    
            DEBUG and log (f'watcher started (me)')
    
            while true :
    
                try :
                    # wait for a backoff period to be in effect
                    me.notime.wait ()
                    DEBUG and log (f'watcher : wakeup, backoff in effect')
    
                    # get time here so it doesnt change after unpause checks, which skews results
                    now = time.time ()
    
                    with me.lock :
    
                        # --- check if any pauses expired
                        # slightly more efficient to copy unpause code here
    
                        for key in list (me.paused) :
                            me.unpause (key)
    
                        # --- get next wakeup time
    
                        nextcheck = 0
                        if len (me.paused) :
                            nextcheck = min (me.paused.values ())
    
                        # clear flag inside lock so we dont clear pauses added by intervening put ()
                        else :
                            # all done, no backoffs in effect
                            me.notime.clear ()
    
                    # IMPORTANT - release lock before sleep !
    
                    if nextcheck :
                        delta = max (nextcheck - now, 0)  # ensure not negative
    
                        # SET MAX SLEEP TIME HERE
                        # if not, short backoff (30s) could start and end before long backoff ends
                        # watcher would never wakeup to clear short backoff
                        # for safety, sleep minimum backoff time = least wasted time
                        # sleep longer if some wasted time is tolerable
    
                        delta = min (delta, 10)
    
                        DEBUG and log (f'watcher : sleep til next check : { delta:0.2f}')
                        time.sleep (delta)
    
                    # restart loop and wait for another backoff period
    
                except Exception as e :
                    log (f'EXCEPTION : { u.fmtexc (e) }')
    
        # ---------------
    
        def get (me) :
            'get item from queue.  blocks until ready.'
            # add block param if compatibility with queue.Queue desired
    
            DEBUG and log (f'trying get...')
    
            # --- keep looping til we get an item
    
            while true :
    
                me.gotime.wait ()
    
                DEBUG and log (f'get : its gotime!')
    
                # --- try to get an item
                # attempt may fail as other threads empty queue before we reach here
    
                with me.lock :
    
                    active = [ x for x in me.outques if x not in me.paused ]   # names of active queues
    
                    # size should be reliable under lock
                    ready = [ a for a in active if len (me.outques [a]) ]
    
                    if ready :
                        DEBUG and log (f'get : found ready queues : { ready }')
                        random.shuffle (ready)
    
                        for key in ready :
                            try :
                                item = me.outques [key].popleft ()
                                return item
    
                            except IndexError :
                                # for safety - not sure how to get here with lock?
                                log (f'deque pop failed ? { key = } : { len (me.outques [key] }')
                                pass
    
                    DEBUG and log (f'get : nothing ready!  making gotime false')
                    me.gotime.clear ()
    
                DEBUG and log (f'get : missed our shot, back to wait')
    
        # ---------------
    
        def putfront (me, item) :
            'return item to front of que to be next taken'
    
            key = me.keyfunc (item)
    
            with me.lock :
                que = me.outques.setdefault (key, deque ())
                que.appendleft (item)
                me.gotime.set ()
    
            size = len (que)
            DEBUG and log (f'put front in queue { key } : { item } : { size = }')
    
        # ---------------
    
        def put (me, item) :
            'return item to back of que'
    
            key = me.keyfunc (item)
    
            with me.lock :
                que = me.outques.setdefault (key, deque ())   
                que.append (item)   
                me.gotime.set ()
    
            size = len (que)
            DEBUG and log (f'put in queue { key } : { item } : { size = }')
    
        # ---------------
    
        def ispaused (me, item) :
            key = me.keyfunc (item)
            with me.lock :
                return key in me.paused
    
        # ---------------
    
        def pause (me, item, sec) :
            key = me.keyfunc (item)
            with me.lock :
                me.paused [key] = time.time () + sec
                me.notime.set ()
    
            DEBUG and log (f'pausing { key } for { sec } sec')
    
        # ---------------
    
        def unpause (me, key, force = false) :
            'check if key que is ready to unpause and do it'
    
            now = time.time ()
    
            with me.lock :
                expire = me.paused.get (key, 0)
                if expire <= now or force :
                    me.paused.pop (key, 0)
                    me.gotime.set ()   # signal items ready
                    DEBUG and log (f'pausewatch : unpausing { key }')
    
        # ---------------
    
        def __len__ (me) :
            with me.lock :
                return sum ( len (x) for x in me.outques.values () )
    
        # ---------------
    
        def qsize (me) :  return len (me)  # for compatibility with queue.Queue
    
    # end Pauseque }
    
    # --- helpers
    DEBUG = false
    
    def log (msg) :
        tid   = threading.get_ident () % 10000
        stamp = f'{ time.time () % 1000:8.3f}'
        print (f'{ tid:4d} at { stamp } : { msg }', flush = true)
    

    Usage

    Pauseque acts like a typical thread-safe que. Multiple threads can put() and get () objects.

    Pauseque has an additional function, pause(). This is triggered by consumer threads when they want to pause a particular group of items (by domain in my case). The que will stop returning those items during the pause period, and resume returning them when pause period ends. Pauses are triggered by an external rate limiting algorithm in consumer threads (see notes for why).

    unpause() method is mostly internal for watcher thread. However consumer threads may want to use in case of dynamic rate limiting that shortens a backoff period.

    Convenience functions ispaused() checks whether que is paused for any item, and putfront() puts an item back at front of que (next to be taken). This is useful for multiple consumer threads checking rate limits: one consumer triggers a rate limit, other consumers check ispaused () right before downloading and put item back if domain is currently paused.

    In terms of code from original post :

    def dl_worker (que) :
        while true :
            item = que.get ()
    
            delay = rate_limit (item)   # check if this item triggers rate limit
            if delay :
                que.pause (item, delay)
                continue
    
            # do some prep on item..., then -
    
            if que.paused (item.domain) :   # check if another thread triggered rate limit
                que.putfront (item)
                continue
    
            # dl item...
    
    # start pool of dl threads
    dlque = Pauseque (get_domain)
    workers = multiprocessing.pool.ThreadPool (5)
    with workers :
        for x in workers._pool :
            workers.apply_async (dl_worker, dlque)
    

    Advantages

    Pauseque is much more efficient than original version. Consumer threads never sleep, all waiting is handled by queue. Pauseque uses threading.Event gotime to signal when items are available, so get() calls block until signaled.

    Pauseque uses separate watcher thread to monitor groups for pause expiration and make them available again. This check blocks on Event notime, which is set when a pause is initiated and cleared when all groups are active again.

    Watcher thread has the only sleep() call in this code, to wakeup and check next group unpause time. Much more efficient than previous code because it's only one thread and it sleeps for long periods (10+ seconds), vs every consumer thread sleeping multiple times per second.

    Performance

    Pauseque performs very well. When unpause happens, consumer threads are processing items again within 1-2ms of scheduled resume time.

    Put and get operations are quite fast. 5.5us per cycle for que.put (item) ; item = que.get () in single thread.

    Notes

    keyfunc is any way to sort queue items into groups for pausing. In my case, download items are sorted by domain. Can be used with other sorting methods for basis of pauses (eg sort by username for traffic shaping by sender).

    Pauseque uses deque internally instead of queue to hold each group. I used queues at first, but put-back items can only go at end of queue. Want to preserve group ordering, and dequeue allows adding to either end. As a bonus, my tests show that a put () - get () cycle is 3x faster with deque: 5.5us with deque vs 15.9us with queue.Queue. Per:

    python -m timeit -s 'import pauseque ; pq = pauseque.Pauseque (lambda x : str (x)) ; item = {"foo":"bar"}' -s 'pq.put (item) ; pq.get ()'

    Rate limiting is done externally to Pauseque because it's a dynamic decision only consumer thread can make. If a dl item is empty or already complete (duplicate request), then no dl is attempted and rate limit isn't triggered. Likewise, if server response http code indicates a timeout or temporary failure, then pause is triggered regardless of static rate limit. Pauseque can't evaluate these conditions, better to let consumer threads trigger a pause.

    One missing feature is Pauseque doesn't preserve ordering between different groups. It would be complex so I skipped it.

    As a result, get() has to choose an item from all available (unpaused) groups. If get() simply iterated over ready group list, then groups inserted in Pauseque first would be preferentially selected. To prevent this, I use random.shuffle on ready before iterating in get(). This ensures every que has equal chance of being selected. Not perfect soluton but good enough.

    If you wanted to be more fair, perhaps you could weight the shuffle by number of items in group. So longer groups have proportionally higher chance of being chosen.

    I also fixed the egregious python devs' mistake of not implementing __len__ for queues. Because why? They're afraid it won't be accurate so they give us qsize instead? Ridiculous. Of course len is never reliable in multithreaded world. Neither is list or dict or anything else. That's a terrible reason to break the way every other container works. Thanks for nothing, devs. :eyeroll: