pythonmultithreadingpython-asynciogeventcurio

Concurrent URL fetching loops with python


I need to run around 500 concurrent loops. Each loop will sequentially fetch a paginated REST endpoint until it reaches the last page of each of the 500 endpoints. Some of these loops have just 5 to 10 pages so it will finish quickly, but others have 100s of pages.

The problem is that I need to put this URL fetching in a sequential, blocking loop because each page has to be fetched in order due to API limitations (The API will throw an error if I fetch page 7, then page 5 for instance). So, the unit of parallelism here are each loop, not each URL fetching inside a loop.

There is no heavy computation done anywhere. Just fetch a page, and throw the raw content into a kafka topic. I'm open to any suggestion except multi-process dependent on many cores. AsyncIO, Gevent, Multithreading...

Edit 1:

The actual problem is that if I use aiohttp to fetch each page inside each loop asynchronously, I have no guarantees that page 2 will be fetched after page 1. The request will be initiated in the correct sequence, but there is absolutely no guarantee that the request will arrive and be processed at the endpoint in the correct sequence.

Edit 2:

As pointed by user4815162342 aiohttp should work

Thank you!


Solution

  • In asyncio you can start in parallel as many loops as there are endpoints, and wait for all of them to finish. Each loop will use aiohttp to fetch endpoint pages sequentially. For example:

    async def download_loop(session, endpoint):
        for i in itertools.count(1):
            try:
                async with session.get(endpoint, params={'page': str(i)}) as resp:
                    content = await resp.read()
            except aiohttp.ClientResponseError:
                break   # no more pages
            # do something with the response content
    
    async def download(endpoints):
        loop = asyncio.get_event_loop()
        async with aiohttp.ClientSession() as session:
            # Start all loops in parallel and wait for them to finish.
            # This will start as many loops as there are endpoints.
            await asyncio.wait([download_loop(session, endpoint)
                                for endpoint in endpoints])
    
    # for testing:
    loop = asyncio.get_event_loop()
    loop.run_until_complete(download(['http://endpoint1', 'http://endpoint2', ...]))
    

    Production code would probably also catch aiohttp.ClientConnectionError and re-try that URL.