pythonasynchronousasync-awaitpython-asynciosleep

When using asyncio, how can I throttle the rate of a dependent sequence of web requests without blocking responses to these requests?


I am attempting to send out a grid, or lattice, or "matrix" of API requests using asyncio: For a "category 1," I may wish to request the datasets 1a, 1b, 1c. For category 2, I may wish to request the data sets 2a, 2b, 2c. And for category 3, I may wish to request the datasets 3a, 3b, 3c. Whether I request 1b depends on the nature of 1a, and whether I request 1c depends on the nature of 1a and 1b, so I have to request 1a, 1b, and 1c in the listed order. An analogous statement holds for categories 2 and 3.

The server receiving these requests takes 0.15 sec to respond to each request, but can respond to up to 10 requests/sec. So I may be able to reduce my runtime by 90% by parallelizing the requests using asyncio. But to do so I need to determine how to throttle my rate of sending to 10 requests/sec.

If I wanted to collect all 1a, 1b, ..., 3c (i.e., if the requests were independent), then I believe I could just use a sequence of create_task calls in a for loop like:

secs_per_req = 0.1
async with aiohttp.ClientSession() as session:
    async with asyncio.TaskGroup() as tg:
        for k in [1, 2, 3]:
            tg.create_task(collect_a_data(k, session))
            await asyncio.sleep(secs_per_req)
            tg.create_task(collect_b_data(k, session))
            await asyncio.sleep(secs_per_req)
            tg.create_task(collect_c_data(k, session))
            await asyncio.sleep(secs_per_req)

However, the fact that 1b and 1c depend on 1a, and 1c depends on 1b, perhaps suggests that I should try a for loop like:

secs_per_req = 0.1
async with aiohttp.ClientSession() as session:
    async with asyncio.TaskGroup() as tg:
        for k in [1, 2, 3]:
            tg.create_task(collect_abc_data(k, session))
            await asyncio.sleep(secs_per_req)

where collect_abc_data(k, session) collects ka and, if needed, kb and kc, and contains all of the logic for deciding whether to request kb and kc. But then I need a way to limit the rate of the requests within collect_abc_data. A natural way to do this would be to sleep (in some manner) immediately before sending each request. However:

So, if the above approaches fail, is there standard way that this problem is solved within the asyncio framework?


Solution

  • Use one shared rate-limiter around every HTTP call.
    That lets each category run its own a -> b -> c chain sequentially while all
    chains run in parallel - yet the whole program never sends more than
    10 requests per second.

    import asyncio, aiohttp
    from aiolimiter import AsyncLimiter       # pip install aiolimiter
    
    limiter = AsyncLimiter(10, 1)             # 10 requests every 1 s
    
    async def fetch(url: str, session: aiohttp.ClientSession):
        # blocks if we've already done 10 calls in the last second
        async with limiter:
            async with session.get(url) as r:
                r.raise_for_status()
                return await r.json()
    
    async def collect_abc(k: int, session: aiohttp.ClientSession):
        a = await fetch(f"/data/{k}a", session)
    
        if need_b(a):                         # your own predicate
            b = await fetch(f"/data/{k}b", session)
    
            if need_c(a, b):
                c = await fetch(f"/data/{k}c", session)
        # … process (a, b, c) here …
    
    async def main():
        async with aiohttp.ClientSession() as session:
            async with asyncio.TaskGroup() as tg:
                for k in (1, 2, 3):           # thousands are fine too
                    tg.create_task(collect_abc(k, session))
    
    asyncio.run(main())
    

    If you prefer no third-party library, start a background task that refills a
    asyncio.Semaphore(10) once per second—the pattern is the same.