I am attempting to send out a grid, or lattice, or "matrix" of API requests using asyncio
: For a "category 1," I may wish to request the datasets 1a, 1b, 1c. For category 2, I may wish to request the data sets 2a, 2b, 2c. And for category 3, I may wish to request the datasets 3a, 3b, 3c. Whether I request 1b depends on the nature of 1a, and whether I request 1c depends on the nature of 1a and 1b, so I have to request 1a, 1b, and 1c in the listed order. An analogous statement holds for categories 2 and 3.
The server receiving these requests takes 0.15 sec to respond to each request, but can respond to up to 10 requests/sec. So I may be able to reduce my runtime by 90% by parallelizing the requests using asyncio
. But to do so I need to determine how to throttle my rate of sending to 10 requests/sec.
If I wanted to collect all 1a, 1b, ..., 3c (i.e., if the requests were independent), then I believe I could just use a sequence of create_task
calls in a for loop like:
secs_per_req = 0.1
async with aiohttp.ClientSession() as session:
async with asyncio.TaskGroup() as tg:
for k in [1, 2, 3]:
tg.create_task(collect_a_data(k, session))
await asyncio.sleep(secs_per_req)
tg.create_task(collect_b_data(k, session))
await asyncio.sleep(secs_per_req)
tg.create_task(collect_c_data(k, session))
await asyncio.sleep(secs_per_req)
However, the fact that 1b and 1c depend on 1a, and 1c depends on 1b, perhaps suggests that I should try a for loop like:
secs_per_req = 0.1
async with aiohttp.ClientSession() as session:
async with asyncio.TaskGroup() as tg:
for k in [1, 2, 3]:
tg.create_task(collect_abc_data(k, session))
await asyncio.sleep(secs_per_req)
where collect_abc_data(k, session)
collects ka and, if needed, kb and kc, and contains all of the logic for deciding whether to request kb and kc. But then I need a way to limit the rate of the requests within collect_abc_data
. A natural way to do this would be to sleep (in some manner) immediately before sending each request. However:
Including an await asyncio.sleep(secs_per_req)
before each request in collect_abc_data
does not limit the rate at which requests are sent. The parent function just moves on to the next task and continues sending out requests without pause.
Including a time.sleep(secs_per_req)
before each request achieves closer to the desired result; it blocks any other task from running during the sleep interval, which limits the rate at which requests are sent. However, it also blocks in a second manner: it forces the requests for 1a, 2a, and 3a to be sent before it processes the response to the request for 1a. For my use, this is nonideal because it delays the collection of ka, kb, and kc, which then delays downstream processing of this data. This becomes especially problematic when there are, say, tens of thousands of k values to request, as there are in my case. (The delay may then amount to hours.)
So, if the above approaches fail, is there standard way that this problem is solved within the asyncio
framework?
Use one shared rate-limiter around every HTTP call.
That lets each category run its own a -> b -> c chain sequentially while all
chains run in parallel - yet the whole program never sends more than
10 requests per second.
import asyncio, aiohttp
from aiolimiter import AsyncLimiter # pip install aiolimiter
limiter = AsyncLimiter(10, 1) # 10 requests every 1 s
async def fetch(url: str, session: aiohttp.ClientSession):
# blocks if we've already done 10 calls in the last second
async with limiter:
async with session.get(url) as r:
r.raise_for_status()
return await r.json()
async def collect_abc(k: int, session: aiohttp.ClientSession):
a = await fetch(f"/data/{k}a", session)
if need_b(a): # your own predicate
b = await fetch(f"/data/{k}b", session)
if need_c(a, b):
c = await fetch(f"/data/{k}c", session)
# … process (a, b, c) here …
async def main():
async with aiohttp.ClientSession() as session:
async with asyncio.TaskGroup() as tg:
for k in (1, 2, 3): # thousands are fine too
tg.create_task(collect_abc(k, session))
asyncio.run(main())
collect_abc
keeps the dependency order with plain await
s.
Every fetch
first acquires the AsyncLimiter, so all tasks together
stay below the 10 req/s ceiling.
If you prefer no third-party library, start a background task that refills a
asyncio.Semaphore(10)
once per second—the pattern is the same.