pythonpython-asynciohttp-status-code-429

How to avoid error 429 (Too Many Requests) python with Asyncio


I am using the following code to make requests with aiohttp client. The server that I am trying to send request has a 30k request limit per hour per IP. So I am getting 429 too many request error. I want to put the job on sleep whenever it hits the limit.

I can extract the x_rateLimit_reset from the header so I thought I could use it to put the job on sleep but I observed very strange behavior. Sometimes the job the sleep time becomes negative and sometimes it gets stuck in sleeping mode.

For example, the last time that I ran the job, it first slept for 2000 seconds and then after the time passed, it again tried to sleep for another 2500 seconds and got stuck in sleeping mode. I think maybe the other parallel processes caused the issue so was wondering how to deal with too many request error msg when using Asyncio.

@backoff.on_exception(backoff.expo, (asyncio.TimeoutError, aiohttp.client_exceptions.ServerDisconnectedError,TooManyRequests),
                          max_time=300)
    async def fetch(self, url, session, params):
        try:
            async with session.get(url, params=params) as response:
                now = int(time.time())
                print(response)
                output = await response.read()
                output = json.loads(output)

                if 'X-RateLimit-Remaining' in response.headers:
                    rate = response.headers['X-RateLimit-Remaining']

                if 'status' in output and output['status'] == 429:
                    x_rateLimit_reset = int(response.headers['X-RateLimit-Reset'])
                    print("sleep mode")
                    seconds = x_rateLimit_reset - now
                    LOGGER.info("The job will sleep for {} seconds".format(seconds))
                    time.sleep(max(seconds,0))
                    raise TooManyRequests()



            return output

        except (asyncio.TimeoutError, TypeError, json.decoder.JSONDecodeError,
                aiohttp.client_exceptions.ServerDisconnectedError) as e:
            print(str(e))

    async def bound_fetch(self, sem, url, session, params):
        # Getter function with semaphore.
        async with sem:
            output = await self.fetch(url, session, params)
        return {"url": url, "output": output}

Edited: This is how I initiate bound_fetch and define the URLs:

def get_responses(self, urls, office_token, params=None):   
    loop = asyncio.get_event_loop()
    future = asyncio.ensure_future(self.run(office_token, urls, params))
    responses = loop.run_until_complete(future)
    return responses

async def run(self, office_token, urls, params):
        tasks = []
        # create instance of Semaphore
        sem = asyncio.BoundedSemaphore(200)
        timeout = ClientTimeout(total=1000)

        async with ClientSession(auth=BasicAuth(office_token, password=' '), timeout=timeout,
                                 connector=TCPConnector(ssl=False)) as session:
            for url in urls:
                # pass Semaphore and session to every GET request
                task = asyncio.ensure_future(self.bound_fetch(sem, url, session, params))
                tasks.append(task)

            responses = await asyncio.gather(*tasks)
            return responses

urls = [
                        "{}/{}".format(self.base_url, "{}?page={}&api_key={}".format(object_name, page_number, self.api_keys))
                        for page_number in range(batch * chunk_size + 1, chunk_size * (1 + batch) + 1)]

Solution

  • Main reason you are using time.sleep() instead await asyncio.sleep().

    UPDATE

    Here is a minimal working solution. I put some comments to describe how it works.

    Please adopt this code to meet your requirements.

    BTW. You can take a look on asyncio-throttle.

    import aiohttp
    import asyncio
    from datetime import datetime
    
    
    async def fetch(session, task):  # fetching urls and mark result of execution
        async with session.get(task['url']) as response:
            if response.status != 200:
                # response.raise_for_status()
                # Here you need to somehow  handle 429 code if it acquired
                # In my example I just skip it.
                task['result'] = response.status
                task['status'] = 'done'
            await response.text()  # just to be sure we acquire data
            print(f"{str(datetime.now())}: Got result of {task['url']}")  # logging
            task['result'] = response.status
            task['status'] = 'done'
    
    
    async def fetch_all(session, urls, persecond):
        # convert to list of dicts
        url_tasks = [{'url': i, 'result': None, 'status': 'new'} for i in urls]
        n = 0  # counter
        while True:
            # calc how many tasks are fetching right now
            running_tasks = len([i for i in url_tasks if i['status'] in ['fetch']])
            # calc how many tasks are still need to be executed
            is_tasks_to_wait = len([i for i in url_tasks if i['status'] != 'done'])
            # check we are not in the end of list n < len()
            # check we have room for one more task
            if n < len(url_tasks) and running_tasks < persecond:
                url_tasks[n]['status'] = 'fetch'
                #
                # Here is main trick
                # If you schedule task inside running loop
                # it will start to execute sync code until find some await
                #
                asyncio.create_task(fetch(session, url_tasks[n]))
                n += 1
                print(f'Schedule tasks {n}. '
                      f'Running {running_tasks} '
                      f'Remain {is_tasks_to_wait}')
            # Check persecond constrain and wait a sec (or period)
            if running_tasks >= persecond:
                print('Throttling')
                await asyncio.sleep(1)
            #
            # Here is another main trick
            # To keep asyncio.run (or loop.run_until_complete) executing
            # we need to wait a little than check that all tasks are done and
            # wait and so on
            if is_tasks_to_wait != 0:
                await asyncio.sleep(0.1)  # wait all tasks done
            else:
                # All tasks done
                break
        return url_tasks
    
    
    async def main():
        urls = ['http://google.com/?1',
                'http://google.com/?2',
                'http://google.com/?3']*3
        async with aiohttp.ClientSession() as session:
            res = await fetch_all(session, urls, 3)
            print(res)
    
    if __name__ == '__main__':
        asyncio.run(main())
        # (asyncio.run) do cancel all pending tasks (we do not have them,
        #  because we check all task done)
        # (asyncio.run) do await canceling all tasks
        # (asyncio.run) do stop loop
        # exit program