I have the following code (I redacted it and made it a bit simpler).
class AbstractRateLimiter(ABC):
@abstractmethod
async def acquire(self, *args, **kwargs):
pass
@abstractmethod
async def release(self):
pass
class RateLimiterB(AbstractRateLimiter):
def __init__(self, rpm: int = 0):
# from aiolimiter import AsyncLimiter
self.limiter = AsyncLimiter(rpm) if rpm > 0 else None
async def acquire(self, *args, **kwargs):
if self.limiter:
await self.limiter.acquire()
async def release(self):
pass
class RateLimiterA(AbstractRateLimiter):
def __init__(self, concurrency_limit: int = 0):
self._semaphore = asyncio.Semaphore(concurrency_limit) if concurrency_limit > 0 else None
async def acquire(self, *args, **kwargs):
if self._semaphore:
await self._semaphore.acquire()
async def release(self):
if self._semaphore:
self._semaphore.release()
and the following code that uses these limiters:
async def execute_async(self, body: dict) -> ChatAdapterResponse:
"""
Returns:
ChatAdapterResponse: a ChatAdapterResponse from the model
"""
rate_limiter = RateLimiterA(1) # RateLimterA(1) / RateLimterB(2) both do not work, try any one of the two.
if rate_limiter:
await rate_limiter.acquire()
try:
# async send, is wrapper around requests
response = await async_send_request(
"SomeUrl", body, self._get_headers()
)
return response
finally:
if rate_limiter:
await rate_limiter.release()
When the code was using unittest it works fine, and waits properly:
class TestRateLimiterA():
def test_rate_limitA():
for i in range(self.TEST_COUNT):
start_time = asyncio.get_running_loop().time()
res = await execute_async(
SOME_DICT
)
end_time = asyncio.get_running_loop().time()
print(
f"Finished requests for {model} in {end_time - start_time} seconds"
)
# assert that from the second try onwards, rpm limit = 1 ensure that the request takes at least 60 seconds
if i > 0:
self.assertTrue(end_time - start_time > 60)
)
def test_rate_limiter_B():
start_time = asyncio.get_running_loop().time()
tasks = [
async_execute_on_model(
body
)
for body in bodies # array of 4 dicts
]
results = await asyncio.gather(*tasks)
end_time = asyncio.get_running_loop().time()
print(f"Finished requests in {end_time - start_time} seconds")
self.assertTrue(
end_time - start_time > 5 * (len(bodys) - 1)
)
However when I tried to switch to using pytest, it seems the async loop and waits are not longer waiting, and the following code executes very quickly and does not wait or respect the synchronization objects. Any idea why ?
async def test_rate_limit_A(mock_send_request, 1rpm_rate_limiterA):
for i in range(TEST_COUNT):
start_time = asyncio.get_running_loop().time()
res = await async_execute_on_model(
SomeDict
)
end_time = asyncio.get_running_loop().time()
print(
f"Finished requests for {dict} in {end_time - start_time} seconds"
)
# assert that from the second try onwards, rpm limit = 1 ensure that the request takes at least 60 seconds
if i > 0:
assert end_time - start_time > 60
async def test_rate_limitB(mock_async_requestB, 1rpm_rate_limiterB):
start_time = asyncio.get_running_loop().time()
tasks = [
async_execute_on_model(
body
)
for body in bodies
]
results = await asyncio.gather(*tasks)
end_time = asyncio.get_running_loop().time()
print(f"Finished requests in {end_time - start_time} seconds")
assert end_time - start_time > 5 * (len(bodies) - 1)
for res in results:
Why is this happening, and what am I doing wrong with pytest-async?
You're instantiating a new rate limiter for each request in execute_async
. Each limiter will allow its single request to be executed immediately.
You need to make sure the same limiter instance is used for all requests.