pythonpython-3.xasynchronousapi-designgrequests

How to slow down asynchrounous API calls to match API limits?


I have a list of ~300K URLs for an API i need to get data from.

The API limit is 100 calls per second.

I have made a class for the asynchronous but this is working to fast and I am hitting an error on the API.

How do I slow down the asynchronous, so that I can make 100 calls per second?

import grequests

lst = ['url.com','url2.com']

class Test:
    def __init__(self):
        self.urls = lst

    def exception(self, request, exception):
        print ("Problem: {}: {}".format(request.url, exception))

    def async(self):
        return grequests.map((grequests.get(u) for u in self.urls), exception_handler=self.exception, size=5)



    def collate_responses(self, results):
        return [x.text for x in results]

test = Test()
#here we collect the results returned by the async function
results = test.async()
response_text = test.collate_responses(results)

Solution

  • The first step that I took was to create an object who can distribute a maximum of n coins every t ms.

    import time
    
    class CoinsDistribution:
        """Object that distribute a maximum of maxCoins every timeLimit ms"""
        def __init__(self, maxCoins, timeLimit):
            self.maxCoins = maxCoins
            self.timeLimit = timeLimit
            self.coin = maxCoins
            self.time = time.perf_counter()
    
    
        def getCoin(self):
            if self.coin <= 0 and not self.restock():
                return False
    
            self.coin -= 1
            return True
    
        def restock(self):
            t = time.perf_counter()
            if (t - self.time) * 1000 < self.timeLimit:
                return False
            self.coin = self.maxCoins
            self.time = t
            return True
    

    Now we need a way of forcing function to only get called if they can get a coin. To do that we can write a decorator function that we could use like that:

    @limitCalls(callLimit=1, timeLimit=1000)
    def uniqFunctionRequestingServer1():
        return 'response from s1'
    

    But sometimes, multiple functions are calling requesting the same server so we would want them to get coins from the the same CoinsDistribution object. Therefor, another use of the decorator would be by supplying the CoinsDistribution object:

    server_2_limit = CoinsDistribution(3, 1000)
    
    @limitCalls(server_2_limit)
    def sendRequestToServer2():
        return 'it worked !!'
    
    @limitCalls(server_2_limit)
    def sendAnOtherRequestToServer2():
        return 'it worked too !!'
    

    We now have to create the decorator, it can take either a CoinsDistribution object or enough data to create a new one.

    import functools
    
    def limitCalls(obj=None, *, callLimit=100, timeLimit=1000):
        if obj is None:
            obj = CoinsDistribution(callLimit, timeLimit)
    
        def limit_decorator(func):
            @functools.wraps(func)
            def limit_wrapper(*args, **kwargs):
                if obj.getCoin():
                    return func(*args, **kwargs)
                return 'limit reached, please wait'
            return limit_wrapper
        return limit_decorator
    

    And it's done ! Now you can limit the number of calls any API that you use and you can build a dictionary to keep track of your CoinsDistribution objects if you have to manage a lot of them (to differrent API endpoints or to different APIs).

    Note: Here I have choosen to return an error message if there are no coins available. You should adapt this behaviour to your needs.