How to manage rate limiting concurrent web queries to multiple domains using only python stdlib? Not asking about algorithms like leaky bucket or single-domain solutions, but how to approach data and code structures for concurrent queries to multiple domains.
The following posts have useful info but don't solve my problem. They don't address concurrent requests and per-domain rate limits, and many answers use 3rd party modules.
This question comes closest, quite similar setup (though I don't revisit same links). But approach uses external database and PHP / Java, not python. Too heavy, need a python solution.
Downloading data from multiple domains. Typical pattern is:
That's simplified a bit. In reality more download items can be added at any time throughout the process, which is ongoing.
Can't / don't want to install 3rd party libs like from pypi, to minimize dependencies and maintain security. Code should be self-contained using only python stdlib.
I use a producer / consumer implementation with a single queue and several worker threads for concurrent downloads. It uses leaky bucket for rate limiting with timestamps (no discard, rate-limited items simply wait). Works perfectly for single-domain downloads. Conceptually like this:
def dl_worker (que) :
while true :
item = que.get ()
if paused (item.domain) : # waiting due to rate limit
time.sleep (backofftime (item.domain))
# dl item...
# start pool of dl threads
dlque = queue.Queue ()
workers = multiprocessing.pool.ThreadPool (5)
with workers :
for x in workers._pool :
workers.apply_async (dl_worker, dlque)
Problem: this uses a single queue for all download items. When dl threads pause to rate limit, any items in queue for second domain are stuck waiting during pause for first domain items.
def dl_worker () :
while true :
item = dlque.get ()
if paused (item.domain) : # waiting due to rate limit
dlque.put (item) # move item to back of queue
continue
# process item...
This is wasteful. If queue only contains items from domain1, then workers will continually shuffle items from front to back during backoff period. Want to avoid busy-wait solutions.
Another option is to spawn separate pool of workers for each domain. That would isolate pauses to just workers on that domain. But domains could be quite large, resulting in huge number of threads and resource exhaustion. And queue becomes more complicated - need a dispatcher to consume items from incoming queue and allocate to separate queues per threadpool/domain.
Could also use 3rd party libs like pyrate-limiter or ratelimit as seen here. But that violates my stdlib only requirement, and only addresses the rate limiting issue, not the multiple domain issue.
The main problem with current solution is having a single queue. I came up with a few other approaches, but they all have drawbacks:
wait ()
on multiple queues in python (like select
on file handles). I have to manually poll all queues to see which are ready. Something like (imagine suitable thread locks where needed):allques = {}
def put (item):
que = allques.setdefault (item.domain, queue.Queue ())
que.put (item)
def dl_worker (que):
while true :
# find which queues have items ready to process
active = [ x for x in allques if x not in paused ]
ready = [ a for a in active if me.allques [a].qsize () ]
if ready :
for domain in ready :
try :
item = me.allques [domain].get (block = false)
# process item ...
break
except Empty :
# qsize was wrong, no biggie, move on to next queue
pass
else :
# wait and poll again
time.sleep (0.5)
I dislike the sleep
polling. I could use a threading.Semaphore
instead to track the number of current items across all queues (call semaphore.release
on every que.put
and semaphore.acquire
before attempting que.get
). But that doesn't help if all queues are currently in rate-limit backoff: dl_worker will busy-wait performing the while loop, with ready
being an empty list on each pass.
Using individual semaphores for each queue just creates the same polling problem.
This approach also doesn't preserve dl item order. Just grabs first item from first ready queue it finds. I could use random.shuffle
on ready to at least randomize which queue is picked from. Preserving item order seems difficult. Would need a separate data structure to track insertion order across all queues. Seems like more trouble than it's worth.
What happens when second item on paused queue expires sooner than first item? It will be stuck waiting on the paused queue until first item is removed and put back on active queue.
Or I need a non-queue data structure for paused, so I can pull off any item that's ready. But then I need sleep polling again (no blocking get
call available).
PriorityQueue
doesn't seem to help.One, its priorities are static. My priorities are dynamic: hitting rate limit triggers a pause, and pause may increase while items are waiting (eg two workers grab items from same domain at same time. first finishes and triggers a pause, second finishes later and lengthens the pause).
Two, PriorityQueue always returns an item if available. I can't set a backoff period to say "hold this item on the queue until time X, then return it".
get()
, they either return or throw. Still need time.sleep polling.So far option 1 seems like the best solution, despite its drawbacks. Does anyone have better ideas?
I'm not inherently tied to a threading model. Async might work as well. I hate littering my code with async / await junk and the red / blue function problem. But if there's a cleaner solution available, it's worth considering.
Another potential solution just came to me: use a scheduler thread. Something like this:
itemsready
itemsready
Worker threads wait()
on itemsready
flag then grab next item from an active queue. Otherwise workers sleep.
This seems like a good solution. Not sure if scheduler needs one thread or two (one to sort incoming items, second to monitor active queues). Because one thread can't wait on both things simultaneously: incomingqueue.get()
and backofftimer.wait()
. Or maybe it can, if I compute time til next backoff expiration and use that as timeout on get()
.
I'll work on code and post an update.
This approach works beautifully. I encapsulated all per-domain queues in a single queue class that simplifies the algorithm. Consumers access one que object, which manages separate per-domain queues internally. Item ready flag can be updated in get
/ put
calls, leaving watcher thread to just monitor backoff expiration.
class Pauseque () : # {
# ---------------
def __init__ (me, keyfunc) :
me.keyfunc = keyfunc # extract unique key (domain) from each item
me.outques = {} # outgoing queues sorted by domain
me.paused = {} # paused queue keys with expiration time
me.lock = threading.RLock () # synchronize ops
me.gotime = threading.Event () # signal when items ready
me.notime = threading.Event () # signal when backoff period active
# spawn new thread to watch queues for ready items
watcher = threading.Thread (target = me._watcher , daemon = true)
watcher.start ()
# ---------------
def _watcher (me) :
'watch for queues to unpause after backoff'
DEBUG and log (f'watcher started (me)')
while true :
try :
# wait for a backoff period to be in effect
me.notime.wait ()
DEBUG and log (f'watcher : wakeup, backoff in effect')
# get time here so it doesnt change after unpause checks, which skews results
now = time.time ()
with me.lock :
# --- check if any pauses expired
# slightly more efficient to copy unpause code here
for key in list (me.paused) :
me.unpause (key)
# --- get next wakeup time
nextcheck = 0
if len (me.paused) :
nextcheck = min (me.paused.values ())
# clear flag inside lock so we dont clear pauses added by intervening put ()
else :
# all done, no backoffs in effect
me.notime.clear ()
# IMPORTANT - release lock before sleep !
if nextcheck :
delta = max (nextcheck - now, 0) # ensure not negative
# SET MAX SLEEP TIME HERE
# if not, short backoff (30s) could start and end before long backoff ends
# watcher would never wakeup to clear short backoff
# for safety, sleep minimum backoff time = least wasted time
# sleep longer if some wasted time is tolerable
delta = min (delta, 10)
DEBUG and log (f'watcher : sleep til next check : { delta:0.2f}')
time.sleep (delta)
# restart loop and wait for another backoff period
except Exception as e :
log (f'EXCEPTION : { u.fmtexc (e) }')
# ---------------
def get (me) :
'get item from queue. blocks until ready.'
# add block param if compatibility with queue.Queue desired
DEBUG and log (f'trying get...')
# --- keep looping til we get an item
while true :
me.gotime.wait ()
DEBUG and log (f'get : its gotime!')
# --- try to get an item
# attempt may fail as other threads empty queue before we reach here
with me.lock :
active = [ x for x in me.outques if x not in me.paused ] # names of active queues
# size should be reliable under lock
ready = [ a for a in active if len (me.outques [a]) ]
if ready :
DEBUG and log (f'get : found ready queues : { ready }')
random.shuffle (ready)
for key in ready :
try :
item = me.outques [key].popleft ()
return item
except IndexError :
# for safety - not sure how to get here with lock?
log (f'deque pop failed ? { key = } : { len (me.outques [key] }')
pass
DEBUG and log (f'get : nothing ready! making gotime false')
me.gotime.clear ()
DEBUG and log (f'get : missed our shot, back to wait')
# ---------------
def putfront (me, item) :
'return item to front of que to be next taken'
key = me.keyfunc (item)
with me.lock :
que = me.outques.setdefault (key, deque ())
que.appendleft (item)
me.gotime.set ()
size = len (que)
DEBUG and log (f'put front in queue { key } : { item } : { size = }')
# ---------------
def put (me, item) :
'return item to back of que'
key = me.keyfunc (item)
with me.lock :
que = me.outques.setdefault (key, deque ())
que.append (item)
me.gotime.set ()
size = len (que)
DEBUG and log (f'put in queue { key } : { item } : { size = }')
# ---------------
def ispaused (me, item) :
key = me.keyfunc (item)
with me.lock :
return key in me.paused
# ---------------
def pause (me, item, sec) :
key = me.keyfunc (item)
with me.lock :
me.paused [key] = time.time () + sec
me.notime.set ()
DEBUG and log (f'pausing { key } for { sec } sec')
# ---------------
def unpause (me, key, force = false) :
'check if key que is ready to unpause and do it'
now = time.time ()
with me.lock :
expire = me.paused.get (key, 0)
if expire <= now or force :
me.paused.pop (key, 0)
me.gotime.set () # signal items ready
DEBUG and log (f'pausewatch : unpausing { key }')
# ---------------
def __len__ (me) :
with me.lock :
return sum ( len (x) for x in me.outques.values () )
# ---------------
def qsize (me) : return len (me) # for compatibility with queue.Queue
# end Pauseque }
# --- helpers
DEBUG = false
def log (msg) :
tid = threading.get_ident () % 10000
stamp = f'{ time.time () % 1000:8.3f}'
print (f'{ tid:4d} at { stamp } : { msg }', flush = true)
Pauseque acts like a typical thread-safe que. Multiple threads can put() and get () objects.
Pauseque has an additional function, pause()
. This is triggered by consumer threads when they want to pause a particular group of items (by domain in my case). The que will stop returning those items during the pause period, and resume returning them when pause period ends. Pauses are triggered by an external rate limiting algorithm in consumer threads (see notes for why).
unpause()
method is mostly internal for watcher thread. However consumer threads may want to use in case of dynamic rate limiting that shortens a backoff period.
Convenience functions ispaused()
checks whether que is paused for any item, and putfront()
puts an item back at front of que (next to be taken). This is useful for multiple consumer threads checking rate limits: one consumer triggers a rate limit, other consumers check ispaused ()
right before downloading and put item back if domain is currently paused.
In terms of code from original post :
def dl_worker (que) :
while true :
item = que.get ()
delay = rate_limit (item) # check if this item triggers rate limit
if delay :
que.pause (item, delay)
continue
# do some prep on item..., then -
if que.paused (item.domain) : # check if another thread triggered rate limit
que.putfront (item)
continue
# dl item...
# start pool of dl threads
dlque = Pauseque (get_domain)
workers = multiprocessing.pool.ThreadPool (5)
with workers :
for x in workers._pool :
workers.apply_async (dl_worker, dlque)
Pauseque is much more efficient than original version. Consumer threads never sleep, all waiting is handled by queue. Pauseque uses threading.Event gotime
to signal when items are available, so get()
calls block until signaled.
Pauseque uses separate watcher thread to monitor groups for pause expiration and make them available again. This check blocks on Event notime
, which is set when a pause is initiated and cleared when all groups are active again.
Watcher thread has the only sleep()
call in this code, to wakeup and check next group unpause time. Much more efficient than previous code because it's only one thread and it sleeps for long periods (10+ seconds), vs every consumer thread sleeping multiple times per second.
Pauseque performs very well. When unpause happens, consumer threads are processing items again within 1-2ms of scheduled resume time.
Put and get operations are quite fast. 5.5us per cycle for que.put (item) ; item = que.get ()
in single thread.
keyfunc is any way to sort queue items into groups for pausing. In my case, download items are sorted by domain. Can be used with other sorting methods for basis of pauses (eg sort by username for traffic shaping by sender).
Pauseque uses deque
internally instead of queue
to hold each group. I used queues at first, but put-back items can only go at end of queue. Want to preserve group ordering, and dequeue allows adding to either end. As a bonus, my tests show that a put () - get () cycle
is 3x faster with deque
: 5.5us with deque vs 15.9us with queue.Queue. Per:
python -m timeit -s 'import pauseque ; pq = pauseque.Pauseque (lambda x : str (x)) ; item = {"foo":"bar"}' -s 'pq.put (item) ; pq.get ()'
Rate limiting is done externally to Pauseque because it's a dynamic decision only consumer thread can make. If a dl item is empty or already complete (duplicate request), then no dl is attempted and rate limit isn't triggered. Likewise, if server response http code indicates a timeout or temporary failure, then pause is triggered regardless of static rate limit. Pauseque can't evaluate these conditions, better to let consumer threads trigger a pause.
One missing feature is Pauseque doesn't preserve ordering between different groups. It would be complex so I skipped it.
As a result, get()
has to choose an item from all available (unpaused) groups. If get()
simply iterated over ready
group list, then groups inserted in Pauseque first would be preferentially selected. To prevent this, I use random.shuffle
on ready
before iterating in get()
. This ensures every que has equal chance of being selected. Not perfect soluton but good enough.
If you wanted to be more fair, perhaps you could weight the shuffle by number of items in group. So longer groups have proportionally higher chance of being chosen.
I also fixed the egregious python devs' mistake of not implementing __len__
for queues. Because why? They're afraid it won't be accurate so they give us qsize
instead? Ridiculous. Of course len is never reliable in multithreaded world. Neither is list or dict or anything else. That's a terrible reason to break the way every other container works. Thanks for nothing, devs. :eyeroll: