python-3.xpython-requestsgetpython-asynciohttpx

using httpx to send 100K get requests


I'm using the httpx library and asyncio to try and send about 100K of get requests.

I ran the code and received httpx.ConnectError so I opened wireshark and saw that I was getting a lot of messages saying TCP Retransmission TCP Port numbers reused wireshark image

when I saw the data in wireshark and the error httpx.ConnectError I added limits = httpx.Limits(max_connections=10000) to limit the amount of active connections to 10,000 but I still get that error.

my code:

import asyncio
import json
import httpx


SOME_URL = "some url"
ANOTHER_URL = "another url"
MAX = 10000


async def search():
    guids = [guid for guid in range(688001, 800000)]  # 688001 - 838611
    timeout = httpx.Timeout(None)
    limits = httpx.Limits(max_connections=MAX)

    async with httpx.AsyncClient(timeout=timeout, limits=limits) as client:
        tasks = [client.get(f"{SOME_URL}{guid}", timeout=timeout) for guid in guids]

        blob_list = await asyncio.gather(*tasks)  # <---- error from here !!!!!

        blob_list = [(res, guid) for res, guid in zip(blob_list, guids)]

    guids = [guid for res, guid in blob_list]
    blob_list = [json.loads(res.text)["blob_name"] for res, guid in blob_list]

    async with httpx.AsyncClient(timeout=timeout, limits=limits) as client:
        tasks = [client.get(f"{ANOTHER_URL}{blob}", timeout=timeout) for blob in blob_list]

        game_results = await asyncio.gather(*tasks)  # <---- error from here !!!!!

        game_results = [(res, guid) for res, guid in zip(game_results, guids)]

    game_results = [guid for res, guid in game_results]

    print(game_results)


def main():
    asyncio.run(search())


if __name__ == '__main__':
    main()

this is a minimal version of my code there some steps in between the requests that I deleted, but I didn't touch the code that made the trouble, there are comments on the lines that I receive the errors (# <---- error from here !!!!!).

does anyone know how to solve this? or another way to send about 100K of get requests fast?


Solution

  • I managed to solve my problem with the following code:

    (this is not the entire code, only the parts needed to send the requests, I have some stuff in between)

    import asyncio
    
    from aiohttp import ClientSession
    
    
    SOME_URL = "some url"
    ANOTHER_URL = "another url"
    MAX_SIM_CONNS = 50
    worker_responses = []
    
    
    async def fetch(url, session):
        async with session.get(url) as response:
            return await response.read()
    
    
    async def fetch_worker(url_queue: asyncio.Queue):
        global worker_responses
        async with ClientSession() as session:
            while True:
                url = await url_queue.get()
                try:
                    if url is None:
                        return
                    response = await fetch(url, session)
                    worker_responses.append(response)
                finally:
                    url_queue.task_done()
                    # calling task_done() is necessary for the url_queue.join() to work correctly
    
    
    async def fetch_all(base_url: str, range_: range):
        url_queue = asyncio.Queue(maxsize=10000)
        worker_tasks = []
        for i in range(MAX_SIM_CONNS):
            wt = asyncio.create_task(fetch_worker(url_queue))
            worker_tasks.append(wt)
        for i in range_:
            await url_queue.put(f"{base_url}{i}")
        for i in range(MAX_SIM_CONNS):
            # tell the workers that the work is done
            await url_queue.put(None)
        await url_queue.join()
        await asyncio.gather(*worker_tasks)
    
    if __name__ == '__main__':
        asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
        asyncio.run(fetch_all(SOME_URL, range(680_842, 840_423)))
        print(worker_responses)
    

    I used aiohttp instead of httpx and used asyncio.Queue to reduce RAM usage and it worked for me.